logo
All projects
2025 · Active

ai-agent

Production-grade AI agent platform in Python. Multi-agent pipeline with RAG, tool execution, two-tier session memory, and async document ingestion — organized around strict hexagonal architecture.

PythonFastAPIPostgreSQLRedisRabbitMQMinIOOpenAI SDK
View on GitHub
Overview

ai-agent is a backend platform for AI-powered conversations with document knowledge augmentation. Users authenticate, open sessions, send messages, and upload documents. The agent pipeline retrieves relevant context from uploaded documents, reasons over it, critiques its own draft, verifies any formulas, and executes registered tools — all in a single request. The codebase is split into three packages in a uv workspace: api (FastAPI application), common (agents, infrastructure, shared protocols), and pipeline (async document workers). Business logic never imports infrastructure directly — everything is wired at startup through a single composition root.

Key features
01OrchestratorAgent coordinates six specialist sub-agents per turn: RetrievalAgent (vector/keyword/hybrid search), ReasoningAgent (answer generation), CritiqueAgent (draft review), MemoryAgent (fact extraction), FormulaVerificationAgent (math correctness), and ActionAgent (tool execution). Each runs only if the PlanningService schedules it.
02Two-tier session memory: cache-aside reads (Redis first, PostgreSQL on miss, repopulate cache) and write-through writes (PostgreSQL first as source of truth, then refresh Redis). An in-process OrderedDict tracks LRU session access and evicts the oldest session from Redis when 1,000 active sessions is exceeded.
03Three-step document upload protocol: client requests a presigned MinIO URL, uploads chunks directly to object storage, then signals completion. The API validates chunk count and publishes a DocumentUploadedEvent to RabbitMQ. The pipeline consumer picks it up asynchronously and starts ingestion — the HTTP response returns in under 100ms regardless of file size.
04Hexagonal architecture enforced at the import level: modules in app/ depend only on Protocol interfaces defined in shared/protocols.py, never on Redis, SQLAlchemy, or the OpenAI SDK directly. main.py is the only place where abstractions are bound to implementations — every service can be tested by passing in a fake repository.
05Tool plugin registry: tools register once at startup and expose themselves as OpenAI function-calling schemas via to_openai_tool(). ActionAgent resolves a tool by name and calls run(params). Adding a new tool requires no changes to existing code — define a BaseTool subclass, register it in main.py, done.
06SynthesisService applies a strict priority order when producing the final answer: document-grounded reasoning (retrieval context marked sufficient) first, then tool results, then conversational reasoning, then a no-information fallback. This prevents the LLM from hallucinating when context is absent.

Multi-Agent Pipeline

Each chat request passes through a three-stage pipeline inside OrchestratorAgent. PlanningService asks the LLM to produce a JSON plan — an ordered list of agent steps with inputs. ExecutionCoordinator iterates the steps and dispatches each one to the appropriate sub-agent. SynthesisService takes all agent outputs and produces the final answer with a defined priority order.

The agents are wired together in the container layer and injected as Protocol interfaces. OrchestratorAgent never imports a concrete Redis client, SQLAlchemy session, or OpenAI SDK object — it only knows the Protocol shapes. This means the entire pipeline can be exercised in tests by passing in stub implementations.

packages/common/src/common/agents/orchestrator/agent.py
def _run_sync(self, input: OrchestratorInput) -> OrchestratorOutput:
    plan = self._planning_service.create_plan(input)
    execution_result = self._execution_coordinator.execute(plan, input)

    answer = self._synthesis_service.synthesize(
        input=input,
        action_output=execution_result.action_output,
        reasoning_output=execution_result.reasoning_output,
        critique_output=execution_result.critique_output,
    )

    confidence = (
        execution_result.reasoning_output.confidence
        if execution_result.reasoning_output is not None
        else self._config.default_confidence
    )
    return OrchestratorOutput(
        answer=answer,
        session_id=input.session_id,
        agent_trace=execution_result.agent_trace,
        confidence=confidence,
    )

Memory: Cache-Aside + Write-Through

Every message turn needs the full session history. Loading from PostgreSQL on each request adds 100–200ms of latency per turn. MemoryService eliminates that with a dual-tier strategy: reads go to Redis first (sub-millisecond), and only fall back to PostgreSQL on a miss, then repopulate the cache. Writes go to PostgreSQL first (source of truth), then refresh Redis so the next read stays hot.

An in-process OrderedDict tracks which sessions are in Redis for LRU eviction. When the 1,000-session limit is exceeded, the least recently used session is deleted from Redis — not from PostgreSQL. The cache can be wiped entirely without data loss. Redis failures on reads fall back gracefully to the database; Redis failures on writes are logged but don't fail the operation since the message is already persisted.

packages/api/src/api/memory/service.py
def get_session_state(self, session_id: str) -> SessionState:
    # Cache-aside: try Redis first
    cached = self._short_term_repository.get_messages(session_id)
    if cached is not None:
        self._track_session_access(session_id)
        return SessionState(session_id=session_id, messages=cached)

    # Cache miss → load from PostgreSQL
    persisted = self._long_term_repository.get_messages(session_id)

    # Repopulate cache
    if self._short_term_repository.set_messages(session_id, persisted):
        self._track_session_access(session_id)

    return SessionState(session_id=session_id, messages=persisted)

def append_message(self, session_id: str, entry: MemoryEntry) -> None:
    # Write-through: database is source of truth
    self._long_term_repository.append_message(session_id, entry)

    # Refresh cache from DB so next read stays hot
    refreshed = self._long_term_repository.get_messages(session_id)
    self._short_term_repository.set_messages(session_id, refreshed)

Document Ingestion

Large file uploads can't be processed inside the HTTP request — parsing, embedding, and indexing time is unpredictable. The upload flow is split into three steps and decoupled from processing via an event queue.

Step 1: the client calls initiate_upload, which generates a presigned MinIO URL and stores an upload session in Redis. Step 2: the client uploads chunks directly to MinIO and notifies the API of each chunk. Step 3: when all chunks are accounted for, complete_upload validates the count, marks the session complete, and publishes a DocumentUploadedEvent to RabbitMQ. The HTTP response returns immediately with a job ID. The pipeline consumer picks up the event, fetches the document from MinIO, and starts the ingestion pipeline. If the consumer crashes, RabbitMQ re-delivers the message automatically.

packages/api/src/api/documents/service.py
def complete_upload(self, request: DocumentUploadCompleteRequest) -> DocumentUploadCompleteResponse:
    session = self.session_repo.get_session(request.upload_session_id)

    # Validate all chunks arrived
    if len(session.chunks_received) != request.total_chunks:
        missing = set(range(1, request.total_chunks + 1)) - session.chunks_received
        self.session_repo.mark_failed(request.upload_session_id, f"Missing: {missing}")
        raise ValueError(f"Missing chunks: {sorted(missing)}")

    self.session_repo.mark_complete(request.upload_session_id)

    # Hand off to async pipeline — HTTP responds in < 100ms
    job_id = str(uuid4())
    self.rabbitmq.publish_json({
        "event_type": "DocumentUploadedEvent",
        "job_id": job_id,
        "document_key": session.bucket_key,
        "user_id": session.user_id,
        "course_id": request.course_id,
    })

    return DocumentUploadCompleteResponse(
        ingestion_job_id=job_id,
        status="queued",
    )

Tool Registry

Tools are plugins. Adding one requires no changes to existing code — define a BaseTool subclass with a name, description, and JSON Schema for its parameters, then register it once in the composition root. From that point the LLM sees it in every request and can invoke it autonomously.

ActionAgent asks the LLM which tool to call and with what arguments, then calls registry.resolve(name) to get the concrete tool instance and runs it. The result is passed back to SynthesisService, which folds it into the final answer. The registry also exposes get_tools_for_openai(), which serializes all registered tools into the OpenAI function-calling format — that list is included in the LLM's system context on every turn.

packages/common/src/common/tools/registry.py
class ToolRegistry:
    def register(self, tool: BaseTool) -> None:
        name = tool.name.strip()
        if name in self._tools:
            raise ValueError(f"Tool '{name}' is already registered")
        self._tools[name] = tool

    def resolve(self, name: str) -> BaseTool:
        tool = self._tools.get(name)
        if tool is None:
            raise ToolNotFoundError(name)
        return tool

    def get_tools_for_openai(self) -> list[dict]:
        # Serialized into LLM context on every request
        return [tool.to_openai_tool() for tool in self._tools.values()]