Skip to content

Logging

Bases: LLM

Wrapper that adds logging to any LLM instance.

Logs all requests asynchronously (fire-and-forget) without blocking the main request flow. Stores metrics in a database and optionally stores request/response bodies in S3.

Example

from majordomo_llm import get_llm_instance from majordomo_llm.logging import LoggingLLM, PostgresAdapter, S3Adapter

llm = get_llm_instance("anthropic", "claude-sonnet-4-20250514") db = await PostgresAdapter.create(host="localhost", ...) storage = await S3Adapter.create(bucket="my-bucket") logged_llm = LoggingLLM(llm, db, storage)

response = await logged_llm.get_response("Hello!")

Source code in src/majordomo_llm/logging/wrapper.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
class LoggingLLM(LLM):
    """Wrapper that adds logging to any LLM instance.

    Logs all requests asynchronously (fire-and-forget) without blocking
    the main request flow. Stores metrics in a database and optionally
    stores request/response bodies in S3.

    Example:
        >>> from majordomo_llm import get_llm_instance
        >>> from majordomo_llm.logging import LoggingLLM, PostgresAdapter, S3Adapter
        >>>
        >>> llm = get_llm_instance("anthropic", "claude-sonnet-4-20250514")
        >>> db = await PostgresAdapter.create(host="localhost", ...)
        >>> storage = await S3Adapter.create(bucket="my-bucket")
        >>> logged_llm = LoggingLLM(llm, db, storage)
        >>>
        >>> response = await logged_llm.get_response("Hello!")
    """

    def __init__(
        self,
        llm: LLM,
        database: DatabaseAdapter,
        storage: StorageAdapter | None = None,
    ) -> None:
        """Initialize the logging wrapper.

        Args:
            llm: The LLM instance to wrap.
            database: Database adapter for storing metrics.
            storage: Optional storage adapter for request/response bodies.
        """
        super().__init__(
            provider=llm.provider,
            model=llm.model,
            input_cost=llm.input_cost,
            output_cost=llm.output_cost,
            supports_temperature_top_p=llm.supports_temperature_top_p,
            use_web_search=llm.use_web_search,
        )
        self._llm = llm
        self._database = database
        self._storage = storage
        self._pending_tasks: set[asyncio.Task[None]] = set()

    async def _log_request(
        self,
        request_body: dict[str, Any],
        response_content: str | dict[str, Any] | None,
        response: Usage | None,
        status: str,
        error_message: str | None,
    ) -> None:
        """Log a request (internal, runs as fire-and-forget task)."""
        request_id = uuid4()
        s3_request_key: str | None = None
        s3_response_key: str | None = None

        if self._storage:
            s3_request_key, s3_response_key = await self._storage.upload(
                request_id, request_body, response_content
            )

        entry = LogEntry(
            request_id=request_id,
            provider=self.provider,
            model=self.model,
            timestamp=datetime.now(UTC),
            response_time=response.response_time if response else None,
            input_tokens=response.input_tokens if response else None,
            output_tokens=response.output_tokens if response else None,
            cached_tokens=response.cached_tokens if response else None,
            input_cost=response.input_cost if response else None,
            output_cost=response.output_cost if response else None,
            total_cost=response.total_cost if response else None,
            s3_request_key=s3_request_key,
            s3_response_key=s3_response_key,
            status=status,
            error_message=error_message,
            api_key_hash=self._llm.api_key_hash,
            api_key_alias=self._llm.api_key_alias,
        )

        await self._database.insert(entry)

    def _fire_and_forget(
        self,
        request_body: dict[str, Any],
        response_content: str | dict[str, Any] | None,
        response: Usage | None,
        status: str,
        error_message: str | None,
    ) -> None:
        """Schedule logging as a background task."""
        task = asyncio.create_task(
            self._log_request(
                request_body=request_body,
                response_content=response_content,
                response=response,
                status=status,
                error_message=error_message,
            )
        )
        self._pending_tasks.add(task)
        task.add_done_callback(self._pending_tasks.discard)

    async def get_response(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
    ) -> LLMResponse:
        """Get a plain text response from the LLM with logging."""
        request_body = {
            "user_prompt": user_prompt,
            "system_prompt": system_prompt,
            "temperature": temperature,
            "top_p": top_p,
        }

        try:
            response = await self._llm.get_response(
                user_prompt, system_prompt, temperature, top_p
            )
            self._fire_and_forget(
                request_body=request_body,
                response_content=response.content,
                response=response,
                status="success",
                error_message=None,
            )
            return response
        except Exception as e:
            self._fire_and_forget(
                request_body=request_body,
                response_content=None,
                response=None,
                status="error",
                error_message=str(e),
            )
            raise

    async def get_json_response(
        self,
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
    ) -> LLMJSONResponse:
        """Get a JSON response from the LLM with logging."""
        request_body = {
            "user_prompt": user_prompt,
            "system_prompt": system_prompt,
            "temperature": temperature,
            "top_p": top_p,
        }

        try:
            response = await self._llm.get_json_response(
                user_prompt, system_prompt, temperature, top_p
            )
            self._fire_and_forget(
                request_body=request_body,
                response_content=response.content,
                response=response,
                status="success",
                error_message=None,
            )
            return response
        except Exception as e:
            self._fire_and_forget(
                request_body=request_body,
                response_content=None,
                response=None,
                status="error",
                error_message=str(e),
            )
            raise

    async def get_structured_json_response(
        self,
        response_model: type[T],
        user_prompt: str,
        system_prompt: str | None = None,
        temperature: float = 0.3,
        top_p: float = 1.0,
    ) -> LLMStructuredResponse:
        """Get a structured response validated against a Pydantic model with logging."""
        request_body = {
            "response_model": response_model.__name__,
            "user_prompt": user_prompt,
            "system_prompt": system_prompt,
            "temperature": temperature,
            "top_p": top_p,
        }

        try:
            response = await self._llm.get_structured_json_response(
                response_model, user_prompt, system_prompt, temperature, top_p
            )
            self._fire_and_forget(
                request_body=request_body,
                response_content=response.content.model_dump(),
                response=response,
                status="success",
                error_message=None,
            )
            return response
        except Exception as e:
            self._fire_and_forget(
                request_body=request_body,
                response_content=None,
                response=None,
                status="error",
                error_message=str(e),
            )
            raise

    async def flush(self) -> None:
        """Wait for all pending logging tasks to complete."""
        if self._pending_tasks:
            await asyncio.gather(*self._pending_tasks, return_exceptions=True)

    async def close(self) -> None:
        """Wait for pending tasks and close database and storage connections."""
        await self.flush()
        await self._database.close()
        if self._storage:
            await self._storage.close()

__init__

__init__(llm, database, storage=None)

Initialize the logging wrapper.

Parameters:

Name Type Description Default
llm LLM

The LLM instance to wrap.

required
database DatabaseAdapter

Database adapter for storing metrics.

required
storage StorageAdapter | None

Optional storage adapter for request/response bodies.

None
Source code in src/majordomo_llm/logging/wrapper.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    llm: LLM,
    database: DatabaseAdapter,
    storage: StorageAdapter | None = None,
) -> None:
    """Initialize the logging wrapper.

    Args:
        llm: The LLM instance to wrap.
        database: Database adapter for storing metrics.
        storage: Optional storage adapter for request/response bodies.
    """
    super().__init__(
        provider=llm.provider,
        model=llm.model,
        input_cost=llm.input_cost,
        output_cost=llm.output_cost,
        supports_temperature_top_p=llm.supports_temperature_top_p,
        use_web_search=llm.use_web_search,
    )
    self._llm = llm
    self._database = database
    self._storage = storage
    self._pending_tasks: set[asyncio.Task[None]] = set()

close async

close()

Wait for pending tasks and close database and storage connections.

Source code in src/majordomo_llm/logging/wrapper.py
241
242
243
244
245
246
async def close(self) -> None:
    """Wait for pending tasks and close database and storage connections."""
    await self.flush()
    await self._database.close()
    if self._storage:
        await self._storage.close()

flush async

flush()

Wait for all pending logging tasks to complete.

Source code in src/majordomo_llm/logging/wrapper.py
236
237
238
239
async def flush(self) -> None:
    """Wait for all pending logging tasks to complete."""
    if self._pending_tasks:
        await asyncio.gather(*self._pending_tasks, return_exceptions=True)

get_json_response async

get_json_response(
    user_prompt,
    system_prompt=None,
    temperature=0.3,
    top_p=1.0,
)

Get a JSON response from the LLM with logging.

Source code in src/majordomo_llm/logging/wrapper.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
async def get_json_response(
    self,
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
) -> LLMJSONResponse:
    """Get a JSON response from the LLM with logging."""
    request_body = {
        "user_prompt": user_prompt,
        "system_prompt": system_prompt,
        "temperature": temperature,
        "top_p": top_p,
    }

    try:
        response = await self._llm.get_json_response(
            user_prompt, system_prompt, temperature, top_p
        )
        self._fire_and_forget(
            request_body=request_body,
            response_content=response.content,
            response=response,
            status="success",
            error_message=None,
        )
        return response
    except Exception as e:
        self._fire_and_forget(
            request_body=request_body,
            response_content=None,
            response=None,
            status="error",
            error_message=str(e),
        )
        raise

get_response async

get_response(
    user_prompt,
    system_prompt=None,
    temperature=0.3,
    top_p=1.0,
)

Get a plain text response from the LLM with logging.

Source code in src/majordomo_llm/logging/wrapper.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def get_response(
    self,
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
) -> LLMResponse:
    """Get a plain text response from the LLM with logging."""
    request_body = {
        "user_prompt": user_prompt,
        "system_prompt": system_prompt,
        "temperature": temperature,
        "top_p": top_p,
    }

    try:
        response = await self._llm.get_response(
            user_prompt, system_prompt, temperature, top_p
        )
        self._fire_and_forget(
            request_body=request_body,
            response_content=response.content,
            response=response,
            status="success",
            error_message=None,
        )
        return response
    except Exception as e:
        self._fire_and_forget(
            request_body=request_body,
            response_content=None,
            response=None,
            status="error",
            error_message=str(e),
        )
        raise

get_structured_json_response async

get_structured_json_response(
    response_model,
    user_prompt,
    system_prompt=None,
    temperature=0.3,
    top_p=1.0,
)

Get a structured response validated against a Pydantic model with logging.

Source code in src/majordomo_llm/logging/wrapper.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
async def get_structured_json_response(
    self,
    response_model: type[T],
    user_prompt: str,
    system_prompt: str | None = None,
    temperature: float = 0.3,
    top_p: float = 1.0,
) -> LLMStructuredResponse:
    """Get a structured response validated against a Pydantic model with logging."""
    request_body = {
        "response_model": response_model.__name__,
        "user_prompt": user_prompt,
        "system_prompt": system_prompt,
        "temperature": temperature,
        "top_p": top_p,
    }

    try:
        response = await self._llm.get_structured_json_response(
            response_model, user_prompt, system_prompt, temperature, top_p
        )
        self._fire_and_forget(
            request_body=request_body,
            response_content=response.content.model_dump(),
            response=response,
            status="success",
            error_message=None,
        )
        return response
    except Exception as e:
        self._fire_and_forget(
            request_body=request_body,
            response_content=None,
            response=None,
            status="error",
            error_message=str(e),
        )
        raise

Bases: DatabaseAdapter

PostgreSQL adapter for logging LLM requests.

Source code in src/majordomo_llm/logging/adapters/postgres.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class PostgresAdapter(DatabaseAdapter):
    """PostgreSQL adapter for logging LLM requests."""

    def __init__(self, pool: asyncpg.Pool) -> None:
        self._pool = pool

    @classmethod
    async def create(
        cls,
        host: str,
        port: int,
        database: str,
        user: str,
        password: str,
        min_size: int = 1,
        max_size: int = 10,
    ) -> "PostgresAdapter":
        """Create a new PostgresAdapter with a connection pool."""
        pool = await asyncpg.create_pool(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            min_size=min_size,
            max_size=max_size,
        )
        return cls(pool)

    async def insert(self, entry: LogEntry) -> None:
        """Insert a log entry into the database."""
        async with self._pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO llm_requests (
                    request_id, provider, model, timestamp, response_time,
                    input_tokens, output_tokens, cached_tokens,
                    input_cost, output_cost, total_cost,
                    s3_request_key, s3_response_key, status, error_message,
                    api_key_hash, api_key_alias
                ) VALUES (
                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
                    $11, $12, $13, $14, $15, $16, $17
                )
                """,
                entry.request_id,
                entry.provider,
                entry.model,
                entry.timestamp,
                entry.response_time,
                entry.input_tokens,
                entry.output_tokens,
                entry.cached_tokens,
                entry.input_cost,
                entry.output_cost,
                entry.total_cost,
                entry.s3_request_key,
                entry.s3_response_key,
                entry.status,
                entry.error_message,
                entry.api_key_hash,
                entry.api_key_alias,
            )

    async def close(self) -> None:
        """Close the connection pool."""
        await self._pool.close()

close async

close()

Close the connection pool.

Source code in src/majordomo_llm/logging/adapters/postgres.py
73
74
75
async def close(self) -> None:
    """Close the connection pool."""
    await self._pool.close()

create async classmethod

create(
    host,
    port,
    database,
    user,
    password,
    min_size=1,
    max_size=10,
)

Create a new PostgresAdapter with a connection pool.

Source code in src/majordomo_llm/logging/adapters/postgres.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@classmethod
async def create(
    cls,
    host: str,
    port: int,
    database: str,
    user: str,
    password: str,
    min_size: int = 1,
    max_size: int = 10,
) -> "PostgresAdapter":
    """Create a new PostgresAdapter with a connection pool."""
    pool = await asyncpg.create_pool(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        min_size=min_size,
        max_size=max_size,
    )
    return cls(pool)

insert async

insert(entry)

Insert a log entry into the database.

Source code in src/majordomo_llm/logging/adapters/postgres.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def insert(self, entry: LogEntry) -> None:
    """Insert a log entry into the database."""
    async with self._pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO llm_requests (
                request_id, provider, model, timestamp, response_time,
                input_tokens, output_tokens, cached_tokens,
                input_cost, output_cost, total_cost,
                s3_request_key, s3_response_key, status, error_message,
                api_key_hash, api_key_alias
            ) VALUES (
                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
                $11, $12, $13, $14, $15, $16, $17
            )
            """,
            entry.request_id,
            entry.provider,
            entry.model,
            entry.timestamp,
            entry.response_time,
            entry.input_tokens,
            entry.output_tokens,
            entry.cached_tokens,
            entry.input_cost,
            entry.output_cost,
            entry.total_cost,
            entry.s3_request_key,
            entry.s3_response_key,
            entry.status,
            entry.error_message,
            entry.api_key_hash,
            entry.api_key_alias,
        )

Bases: DatabaseAdapter

SQLite adapter for logging LLM requests.

Provides a lightweight, zero-setup option for local development and examples. The database file and table are created automatically.

Example

db = await SqliteAdapter.create("llm_logs.db") logged_llm = LoggingLLM(llm, db)

Source code in src/majordomo_llm/logging/adapters/sqlite.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class SqliteAdapter(DatabaseAdapter):
    """SQLite adapter for logging LLM requests.

    Provides a lightweight, zero-setup option for local development and examples.
    The database file and table are created automatically.

    Example:
        >>> db = await SqliteAdapter.create("llm_logs.db")
        >>> logged_llm = LoggingLLM(llm, db)
    """

    def __init__(self, connection: aiosqlite.Connection) -> None:
        self._connection = connection

    @classmethod
    async def create(cls, database_path: str) -> "SqliteAdapter":
        """Create a new SqliteAdapter.

        Args:
            database_path: Path to the SQLite database file.
                Use ":memory:" for an in-memory database.

        Returns:
            A configured SqliteAdapter instance.
        """
        connection = await aiosqlite.connect(database_path)
        await connection.execute(CREATE_TABLE_SQL)
        await connection.commit()
        return cls(connection)

    async def insert(self, entry: LogEntry) -> None:
        """Insert a log entry into the database."""
        await self._connection.execute(
            INSERT_SQL,
            (
                str(entry.request_id),
                entry.provider,
                entry.model,
                entry.timestamp.isoformat(),
                entry.response_time,
                entry.input_tokens,
                entry.output_tokens,
                entry.cached_tokens,
                entry.input_cost,
                entry.output_cost,
                entry.total_cost,
                entry.s3_request_key,
                entry.s3_response_key,
                entry.status,
                entry.error_message,
                entry.api_key_hash,
                entry.api_key_alias,
            ),
        )
        await self._connection.commit()

    async def close(self) -> None:
        """Close the database connection."""
        await self._connection.close()

close async

close()

Close the database connection.

Source code in src/majordomo_llm/logging/adapters/sqlite.py
97
98
99
async def close(self) -> None:
    """Close the database connection."""
    await self._connection.close()

create async classmethod

create(database_path)

Create a new SqliteAdapter.

Parameters:

Name Type Description Default
database_path str

Path to the SQLite database file. Use ":memory:" for an in-memory database.

required

Returns:

Type Description
SqliteAdapter

A configured SqliteAdapter instance.

Source code in src/majordomo_llm/logging/adapters/sqlite.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@classmethod
async def create(cls, database_path: str) -> "SqliteAdapter":
    """Create a new SqliteAdapter.

    Args:
        database_path: Path to the SQLite database file.
            Use ":memory:" for an in-memory database.

    Returns:
        A configured SqliteAdapter instance.
    """
    connection = await aiosqlite.connect(database_path)
    await connection.execute(CREATE_TABLE_SQL)
    await connection.commit()
    return cls(connection)

insert async

insert(entry)

Insert a log entry into the database.

Source code in src/majordomo_llm/logging/adapters/sqlite.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def insert(self, entry: LogEntry) -> None:
    """Insert a log entry into the database."""
    await self._connection.execute(
        INSERT_SQL,
        (
            str(entry.request_id),
            entry.provider,
            entry.model,
            entry.timestamp.isoformat(),
            entry.response_time,
            entry.input_tokens,
            entry.output_tokens,
            entry.cached_tokens,
            entry.input_cost,
            entry.output_cost,
            entry.total_cost,
            entry.s3_request_key,
            entry.s3_response_key,
            entry.status,
            entry.error_message,
            entry.api_key_hash,
            entry.api_key_alias,
        ),
    )
    await self._connection.commit()

Bases: DatabaseAdapter

MySQL adapter for logging LLM requests.

Source code in src/majordomo_llm/logging/adapters/mysql.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class MySQLAdapter(DatabaseAdapter):
    """MySQL adapter for logging LLM requests."""

    def __init__(self, pool: aiomysql.Pool) -> None:
        self._pool = pool

    @classmethod
    async def create(
        cls,
        host: str,
        port: int,
        database: str,
        user: str,
        password: str,
        minsize: int = 1,
        maxsize: int = 10,
    ) -> "MySQLAdapter":
        """Create a new MySQLAdapter with a connection pool."""
        pool = await aiomysql.create_pool(
            host=host,
            port=port,
            db=database,
            user=user,
            password=password,
            minsize=minsize,
            maxsize=maxsize,
        )
        return cls(pool)

    async def insert(self, entry: LogEntry) -> None:
        """Insert a log entry into the database."""
        async with self._pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO llm_requests (
                        request_id, provider, model, timestamp, response_time,
                        input_tokens, output_tokens, cached_tokens,
                        input_cost, output_cost, total_cost,
                        s3_request_key, s3_response_key, status, error_message,
                        api_key_hash, api_key_alias
                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    """,
                    (
                        str(entry.request_id),
                        entry.provider,
                        entry.model,
                        entry.timestamp,
                        entry.response_time,
                        entry.input_tokens,
                        entry.output_tokens,
                        entry.cached_tokens,
                        entry.input_cost,
                        entry.output_cost,
                        entry.total_cost,
                        entry.s3_request_key,
                        entry.s3_response_key,
                        entry.status,
                        entry.error_message,
                        entry.api_key_hash,
                        entry.api_key_alias,
                    ),
                )
            await conn.commit()

    async def close(self) -> None:
        """Close the connection pool."""
        self._pool.close()
        await self._pool.wait_closed()

close async

close()

Close the connection pool.

Source code in src/majordomo_llm/logging/adapters/mysql.py
74
75
76
77
async def close(self) -> None:
    """Close the connection pool."""
    self._pool.close()
    await self._pool.wait_closed()

create async classmethod

create(
    host,
    port,
    database,
    user,
    password,
    minsize=1,
    maxsize=10,
)

Create a new MySQLAdapter with a connection pool.

Source code in src/majordomo_llm/logging/adapters/mysql.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@classmethod
async def create(
    cls,
    host: str,
    port: int,
    database: str,
    user: str,
    password: str,
    minsize: int = 1,
    maxsize: int = 10,
) -> "MySQLAdapter":
    """Create a new MySQLAdapter with a connection pool."""
    pool = await aiomysql.create_pool(
        host=host,
        port=port,
        db=database,
        user=user,
        password=password,
        minsize=minsize,
        maxsize=maxsize,
    )
    return cls(pool)

insert async

insert(entry)

Insert a log entry into the database.

Source code in src/majordomo_llm/logging/adapters/mysql.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def insert(self, entry: LogEntry) -> None:
    """Insert a log entry into the database."""
    async with self._pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                """
                INSERT INTO llm_requests (
                    request_id, provider, model, timestamp, response_time,
                    input_tokens, output_tokens, cached_tokens,
                    input_cost, output_cost, total_cost,
                    s3_request_key, s3_response_key, status, error_message,
                    api_key_hash, api_key_alias
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    str(entry.request_id),
                    entry.provider,
                    entry.model,
                    entry.timestamp,
                    entry.response_time,
                    entry.input_tokens,
                    entry.output_tokens,
                    entry.cached_tokens,
                    entry.input_cost,
                    entry.output_cost,
                    entry.total_cost,
                    entry.s3_request_key,
                    entry.s3_response_key,
                    entry.status,
                    entry.error_message,
                    entry.api_key_hash,
                    entry.api_key_alias,
                ),
            )
        await conn.commit()

Bases: StorageAdapter

S3 adapter for storing request/response bodies.

Source code in src/majordomo_llm/logging/adapters/s3.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class S3Adapter(StorageAdapter):
    """S3 adapter for storing request/response bodies."""

    def __init__(
        self,
        session: aioboto3.Session,
        bucket: str,
        prefix: str = "llm-logs",
    ) -> None:
        self._session = session
        self._bucket = bucket
        self._prefix = prefix
        self._client = None

    @classmethod
    async def create(
        cls,
        bucket: str,
        prefix: str = "llm-logs",
        region_name: str | None = None,
        aws_access_key_id: str | None = None,
        aws_secret_access_key: str | None = None,
    ) -> "S3Adapter":
        """Create a new S3Adapter."""
        session = aioboto3.Session(
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        return cls(session, bucket, prefix)

    async def upload(
        self,
        request_id: UUID,
        request_body: dict[str, Any],
        response_content: str | dict[str, Any] | None,
    ) -> tuple[str, str | None]:
        """Upload request and response bodies to S3."""
        request_key = f"{self._prefix}/{request_id}/request.json"
        response_key = f"{self._prefix}/{request_id}/response.json" if response_content else None

        async with self._session.client("s3") as s3:
            await s3.put_object(
                Bucket=self._bucket,
                Key=request_key,
                Body=json.dumps(request_body, default=str),
                ContentType="application/json",
            )

            if response_content is not None:
                body = (
                    json.dumps(response_content, default=str)
                    if isinstance(response_content, dict)
                    else response_content
                )
                await s3.put_object(
                    Bucket=self._bucket,
                    Key=response_key,
                    Body=body,
                    ContentType="application/json",
                )

        return request_key, response_key

    async def close(self) -> None:
        """Close the S3 client (no-op for aioboto3 context-managed clients)."""
        pass

close async

close()

Close the S3 client (no-op for aioboto3 context-managed clients).

Source code in src/majordomo_llm/logging/adapters/s3.py
76
77
78
async def close(self) -> None:
    """Close the S3 client (no-op for aioboto3 context-managed clients)."""
    pass

create async classmethod

create(
    bucket,
    prefix="llm-logs",
    region_name=None,
    aws_access_key_id=None,
    aws_secret_access_key=None,
)

Create a new S3Adapter.

Source code in src/majordomo_llm/logging/adapters/s3.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@classmethod
async def create(
    cls,
    bucket: str,
    prefix: str = "llm-logs",
    region_name: str | None = None,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
) -> "S3Adapter":
    """Create a new S3Adapter."""
    session = aioboto3.Session(
        region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
    )
    return cls(session, bucket, prefix)

upload async

upload(request_id, request_body, response_content)

Upload request and response bodies to S3.

Source code in src/majordomo_llm/logging/adapters/s3.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def upload(
    self,
    request_id: UUID,
    request_body: dict[str, Any],
    response_content: str | dict[str, Any] | None,
) -> tuple[str, str | None]:
    """Upload request and response bodies to S3."""
    request_key = f"{self._prefix}/{request_id}/request.json"
    response_key = f"{self._prefix}/{request_id}/response.json" if response_content else None

    async with self._session.client("s3") as s3:
        await s3.put_object(
            Bucket=self._bucket,
            Key=request_key,
            Body=json.dumps(request_body, default=str),
            ContentType="application/json",
        )

        if response_content is not None:
            body = (
                json.dumps(response_content, default=str)
                if isinstance(response_content, dict)
                else response_content
            )
            await s3.put_object(
                Bucket=self._bucket,
                Key=response_key,
                Body=body,
                ContentType="application/json",
            )

    return request_key, response_key

Bases: StorageAdapter

Local file system adapter for storing request/response bodies.

Stores each request and response as separate JSON files in a directory. Useful for local development, debugging, and examples where S3 is overkill.

Files are stored as

{base_path}/{request_id}_request.json {base_path}/{request_id}_response.json

Example

storage = await FileStorageAdapter.create("./llm_logs") logged_llm = LoggingLLM(llm, db, storage)

Source code in src/majordomo_llm/logging/adapters/file.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class FileStorageAdapter(StorageAdapter):
    """Local file system adapter for storing request/response bodies.

    Stores each request and response as separate JSON files in a directory.
    Useful for local development, debugging, and examples where S3 is overkill.

    Files are stored as:
        {base_path}/{request_id}_request.json
        {base_path}/{request_id}_response.json

    Example:
        >>> storage = await FileStorageAdapter.create("./llm_logs")
        >>> logged_llm = LoggingLLM(llm, db, storage)
    """

    def __init__(self, base_path: Path) -> None:
        self._base_path = base_path

    @classmethod
    async def create(cls, base_path: str | Path) -> "FileStorageAdapter":
        """Create a new FileStorageAdapter.

        Args:
            base_path: Directory where log files will be stored.
                Created automatically if it doesn't exist.

        Returns:
            A configured FileStorageAdapter instance.
        """
        path = Path(base_path)
        path.mkdir(parents=True, exist_ok=True)
        return cls(path)

    async def upload(
        self,
        request_id: UUID,
        request_body: dict[str, Any],
        response_content: str | dict[str, Any] | None,
    ) -> tuple[str, str | None]:
        """Store request and response bodies as local JSON files."""
        request_key = f"{request_id}_request.json"
        request_path = self._base_path / request_key

        async with aiofiles.open(request_path, "w") as f:
            await f.write(json.dumps(request_body, indent=2, default=str))

        response_key: str | None = None
        if response_content is not None:
            response_key = f"{request_id}_response.json"
            response_path = self._base_path / response_key

            body = (
                json.dumps(response_content, indent=2, default=str)
                if isinstance(response_content, dict)
                else response_content
            )
            async with aiofiles.open(response_path, "w") as f:
                await f.write(body)

        return request_key, response_key

    async def close(self) -> None:
        """Close the storage adapter (no-op for file storage)."""
        pass

close async

close()

Close the storage adapter (no-op for file storage).

Source code in src/majordomo_llm/logging/adapters/file.py
74
75
76
async def close(self) -> None:
    """Close the storage adapter (no-op for file storage)."""
    pass

create async classmethod

create(base_path)

Create a new FileStorageAdapter.

Parameters:

Name Type Description Default
base_path str | Path

Directory where log files will be stored. Created automatically if it doesn't exist.

required

Returns:

Type Description
FileStorageAdapter

A configured FileStorageAdapter instance.

Source code in src/majordomo_llm/logging/adapters/file.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@classmethod
async def create(cls, base_path: str | Path) -> "FileStorageAdapter":
    """Create a new FileStorageAdapter.

    Args:
        base_path: Directory where log files will be stored.
            Created automatically if it doesn't exist.

    Returns:
        A configured FileStorageAdapter instance.
    """
    path = Path(base_path)
    path.mkdir(parents=True, exist_ok=True)
    return cls(path)

upload async

upload(request_id, request_body, response_content)

Store request and response bodies as local JSON files.

Source code in src/majordomo_llm/logging/adapters/file.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def upload(
    self,
    request_id: UUID,
    request_body: dict[str, Any],
    response_content: str | dict[str, Any] | None,
) -> tuple[str, str | None]:
    """Store request and response bodies as local JSON files."""
    request_key = f"{request_id}_request.json"
    request_path = self._base_path / request_key

    async with aiofiles.open(request_path, "w") as f:
        await f.write(json.dumps(request_body, indent=2, default=str))

    response_key: str | None = None
    if response_content is not None:
        response_key = f"{request_id}_response.json"
        response_path = self._base_path / response_key

        body = (
            json.dumps(response_content, indent=2, default=str)
            if isinstance(response_content, dict)
            else response_content
        )
        async with aiofiles.open(response_path, "w") as f:
            await f.write(body)

    return request_key, response_key