import asyncio import gradio as gr from fastapi import FastAPI from config import settings from listener import RabbitListenerBase from rabbit_repo import RabbitRepo from service import LLMService from runners.base import ILLMRunner # --- Runner factory (stub) --- class EchoRunner(ILLMRunner): Type = "EchoRunner" async def StartProcess(self, llmServiceObj: dict): pass async def RemoveProcess(self, sessionId: str): pass async def StopRequest(self, sessionId: str): pass async def SendInputAndGetResponse(self, llmServiceObj: dict): # Emits a message back (you can choose queue names per your topology) pass async def runner_factory(llmServiceObj: dict) -> ILLMRunner: # Use llmServiceObj["LLMRunnerType"] to instantiate different runners return EchoRunner() # --- Publisher and Service --- publisher = RabbitRepo(external_source="https://space.external") # put your ExternalUrl if you have one service = LLMService(publisher, runner_factory) # --- Handlers mapping .NET FuncName -> service method --- async def h_start(data): await service.StartProcess(data or {}) async def h_user(data): await service.UserInput(data or {}) async def h_remove(data): await service.RemoveSession(data or {}) async def h_stop(data): await service.StopRequest(data or {}) async def h_qir(data): await service.QueryIndexResult(data or {}) async def h_getreg(data): await service.GetFunctionRegistry(False) async def h_getreg_f(data): await service.GetFunctionRegistry(True) handlers = { "llmStartSession": h_start, "llmUserInput": h_user, "llmRemoveSession": h_remove, "llmStopRequest": h_stop, "queryIndexResult": h_qir, "getFunctionRegistry": h_getreg, "getFunctionRegistryFiltered": h_getreg_f, } listener = RabbitListenerBase(service_id=settings.SERVICE_ID, handlers=handlers) # Declarations mirror your C# InitRabbitMQObjs() DECLS = [ {"ExchangeName":"llmStartSession"+settings.SERVICE_ID, "FuncName":"llmStartSession", "MessageTimeout":600000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"llmUserInput"+settings.SERVICE_ID, "FuncName":"llmUserInput", "MessageTimeout":600000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"llmRemoveSession"+settings.SERVICE_ID, "FuncName":"llmRemoveSession", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"llmStopRequest"+settings.SERVICE_ID, "FuncName":"llmStopRequest", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"queryIndexResult"+settings.SERVICE_ID, "FuncName":"queryIndexResult", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"getFunctionRegistry"+settings.SERVICE_ID, "FuncName":"getFunctionRegistry", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, {"ExchangeName":"getFunctionRegistryFiltered"+settings.SERVICE_ID, "FuncName":"getFunctionRegistryFiltered", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]}, ] # --- Gradio UI (for smoke test) --- async def ping(): return "ok" with gr.Blocks() as demo: gr.Markdown("### LLM Runner (Python) listening on RabbitMQ") btn = gr.Button("Ping") out = gr.Textbox() btn.click(ping, inputs=None, outputs=out) # --- FastAPI mount + lifecycle --- app = FastAPI() app = gr.mount_gradio_app(app, demo, path="/") @get("/health") async def health(): return {"status":"ok"} @on_event("startup") async def on_start(): await publisher.connect() await service.init() await listener.start(DECLS) @on_event("shutdown") async def on_stop(): # aio-pika RobustConnection closes on GC; optionally add explicit closes if you add references pass if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)