ai-agent
Production-grade AI agent platform in Python. Multi-agent pipeline with RAG, tool execution, two-tier session memory, and async document ingestion — organized around strict hexagonal architecture.
ai-agent is a backend platform for AI-powered conversations with document knowledge augmentation. Users authenticate, open sessions, send messages, and upload documents. The agent pipeline retrieves relevant context from uploaded documents, reasons over it, critiques its own draft, verifies any formulas, and executes registered tools — all in a single request. The codebase is split into three packages in a uv workspace: api (FastAPI application), common (agents, infrastructure, shared protocols), and pipeline (async document workers). Business logic never imports infrastructure directly — everything is wired at startup through a single composition root.
Multi-Agent Pipeline
Each chat request passes through a three-stage pipeline inside OrchestratorAgent. PlanningService asks the LLM to produce a JSON plan — an ordered list of agent steps with inputs. ExecutionCoordinator iterates the steps and dispatches each one to the appropriate sub-agent. SynthesisService takes all agent outputs and produces the final answer with a defined priority order.
The agents are wired together in the container layer and injected as Protocol interfaces. OrchestratorAgent never imports a concrete Redis client, SQLAlchemy session, or OpenAI SDK object — it only knows the Protocol shapes. This means the entire pipeline can be exercised in tests by passing in stub implementations.
def _run_sync(self, input: OrchestratorInput) -> OrchestratorOutput:
plan = self._planning_service.create_plan(input)
execution_result = self._execution_coordinator.execute(plan, input)
answer = self._synthesis_service.synthesize(
input=input,
action_output=execution_result.action_output,
reasoning_output=execution_result.reasoning_output,
critique_output=execution_result.critique_output,
)
confidence = (
execution_result.reasoning_output.confidence
if execution_result.reasoning_output is not None
else self._config.default_confidence
)
return OrchestratorOutput(
answer=answer,
session_id=input.session_id,
agent_trace=execution_result.agent_trace,
confidence=confidence,
)Memory: Cache-Aside + Write-Through
Every message turn needs the full session history. Loading from PostgreSQL on each request adds 100–200ms of latency per turn. MemoryService eliminates that with a dual-tier strategy: reads go to Redis first (sub-millisecond), and only fall back to PostgreSQL on a miss, then repopulate the cache. Writes go to PostgreSQL first (source of truth), then refresh Redis so the next read stays hot.
An in-process OrderedDict tracks which sessions are in Redis for LRU eviction. When the 1,000-session limit is exceeded, the least recently used session is deleted from Redis — not from PostgreSQL. The cache can be wiped entirely without data loss. Redis failures on reads fall back gracefully to the database; Redis failures on writes are logged but don't fail the operation since the message is already persisted.
def get_session_state(self, session_id: str) -> SessionState:
# Cache-aside: try Redis first
cached = self._short_term_repository.get_messages(session_id)
if cached is not None:
self._track_session_access(session_id)
return SessionState(session_id=session_id, messages=cached)
# Cache miss → load from PostgreSQL
persisted = self._long_term_repository.get_messages(session_id)
# Repopulate cache
if self._short_term_repository.set_messages(session_id, persisted):
self._track_session_access(session_id)
return SessionState(session_id=session_id, messages=persisted)
def append_message(self, session_id: str, entry: MemoryEntry) -> None:
# Write-through: database is source of truth
self._long_term_repository.append_message(session_id, entry)
# Refresh cache from DB so next read stays hot
refreshed = self._long_term_repository.get_messages(session_id)
self._short_term_repository.set_messages(session_id, refreshed)Document Ingestion
Large file uploads can't be processed inside the HTTP request — parsing, embedding, and indexing time is unpredictable. The upload flow is split into three steps and decoupled from processing via an event queue.
Step 1: the client calls initiate_upload, which generates a presigned MinIO URL and stores an upload session in Redis. Step 2: the client uploads chunks directly to MinIO and notifies the API of each chunk. Step 3: when all chunks are accounted for, complete_upload validates the count, marks the session complete, and publishes a DocumentUploadedEvent to RabbitMQ. The HTTP response returns immediately with a job ID. The pipeline consumer picks up the event, fetches the document from MinIO, and starts the ingestion pipeline. If the consumer crashes, RabbitMQ re-delivers the message automatically.
def complete_upload(self, request: DocumentUploadCompleteRequest) -> DocumentUploadCompleteResponse:
session = self.session_repo.get_session(request.upload_session_id)
# Validate all chunks arrived
if len(session.chunks_received) != request.total_chunks:
missing = set(range(1, request.total_chunks + 1)) - session.chunks_received
self.session_repo.mark_failed(request.upload_session_id, f"Missing: {missing}")
raise ValueError(f"Missing chunks: {sorted(missing)}")
self.session_repo.mark_complete(request.upload_session_id)
# Hand off to async pipeline — HTTP responds in < 100ms
job_id = str(uuid4())
self.rabbitmq.publish_json({
"event_type": "DocumentUploadedEvent",
"job_id": job_id,
"document_key": session.bucket_key,
"user_id": session.user_id,
"course_id": request.course_id,
})
return DocumentUploadCompleteResponse(
ingestion_job_id=job_id,
status="queued",
)Tool Registry
Tools are plugins. Adding one requires no changes to existing code — define a BaseTool subclass with a name, description, and JSON Schema for its parameters, then register it once in the composition root. From that point the LLM sees it in every request and can invoke it autonomously.
ActionAgent asks the LLM which tool to call and with what arguments, then calls registry.resolve(name) to get the concrete tool instance and runs it. The result is passed back to SynthesisService, which folds it into the final answer. The registry also exposes get_tools_for_openai(), which serializes all registered tools into the OpenAI function-calling format — that list is included in the LLM's system context on every turn.
class ToolRegistry:
def register(self, tool: BaseTool) -> None:
name = tool.name.strip()
if name in self._tools:
raise ValueError(f"Tool '{name}' is already registered")
self._tools[name] = tool
def resolve(self, name: str) -> BaseTool:
tool = self._tools.get(name)
if tool is None:
raise ToolNotFoundError(name)
return tool
def get_tools_for_openai(self) -> list[dict]:
# Serialized into LLM context on every request
return [tool.to_openai_tool() for tool in self._tools.values()]