feat: add api usage limits and audit storage

Implements authorizeUsage (rate limit + monthly token limit with FOR UPDATE locking), beginUsageEvent, completeUsageEvent, failUsageEvent, streamJsonlLogs, and cleanupExpiredMessageLogs in lib/audit-store.cjs, with integration tests in test/audit.integration.test.cjs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 01:52:47 -06:00
parent de6b466df7
commit ffee4a52cc
2 changed files with 632 additions and 0 deletions
+341
View File
@@ -0,0 +1,341 @@
"use strict"
const { withTransaction } = require("./db.cjs")
// Number of days to retain message logs (default 90)
const LOG_RETENTION_DAYS = parseInt(process.env.HERMES_LOG_RETENTION_DAYS || "90", 10)
// ─── Helpers ──────────────────────────────────────────────────────────────────
function tryParseJson(text) {
if (!text) return null
try {
return JSON.parse(text)
} catch (_e) {
return null
}
}
// ─── authorizeUsage ───────────────────────────────────────────────────────────
/**
* Check rate limits and monthly token limits for an API user.
* Uses FOR UPDATE locking on api_users to prevent race conditions.
*
* @param {import('pg').Pool} pool
* @param {{ id: string, requests_per_minute: number, monthly_token_limit: number }} apiUser
* @param {Date} now
* @returns {Promise<{ ok: true } | { ok: false, reason: 'rate_limited', retryAfter: number } | { ok: false, reason: 'token_limit_exceeded' }>}
*/
async function authorizeUsage(pool, apiUser, now = new Date()) {
return withTransaction(pool, async (client) => {
// Acquire row-level lock on this api_user to prevent concurrent races
const { rows: lockRows } = await client.query(
"SELECT id FROM api_users WHERE id = $1 FOR UPDATE",
[apiUser.id]
)
if (!lockRows.length) {
const err = new Error("API user not found")
err.status = 404
throw err
}
// 1. Check requests per minute
const windowStart = new Date(now.getTime() - 60 * 1000)
const { rows: rateRows } = await client.query(
`SELECT count(*) AS cnt FROM usage_events
WHERE api_user_id = $1 AND request_started_at >= $2`,
[apiUser.id, windowStart]
)
const recentCount = parseInt(rateRows[0].cnt, 10)
if (recentCount >= apiUser.requests_per_minute) {
// retryAfter: seconds until the oldest request falls out of the 60s window
// Conservatively return 60 seconds
return { ok: false, reason: "rate_limited", retryAfter: 60 }
}
// 2. Check monthly token limit
const { rows: tokenRows } = await client.query(
`SELECT coalesce(sum(total_tokens), 0) AS tokens FROM usage_events
WHERE api_user_id = $1 AND request_started_at >= date_trunc('month', $2::timestamptz)`,
[apiUser.id, now]
)
const monthlyTokens = parseInt(tokenRows[0].tokens, 10)
if (monthlyTokens >= apiUser.monthly_token_limit) {
return { ok: false, reason: "token_limit_exceeded" }
}
return { ok: true }
})
}
// ─── beginUsageEvent ─────────────────────────────────────────────────────────
/**
* Insert a new incomplete usage_events row.
*
* @param {import('pg').Pool} pool
* @param {{
* id: string,
* apiUserId: string,
* apiUserName: string,
* apiKeyId: string,
* route: 'pre'|'post',
* requestStartedAt: Date,
* model: string|null,
* promptTokens: number,
* completionTokens: number,
* totalTokens: number,
* }} input
* @returns {Promise<object>}
*/
async function beginUsageEvent(pool, input) {
const { rows } = await pool.query(
`INSERT INTO usage_events
(id, api_user_id, api_user_name, api_key_id, route, request_started_at,
model, prompt_tokens, completion_tokens, total_tokens, audit_complete)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, false)
RETURNING *`,
[
input.id,
input.apiUserId,
input.apiUserName,
input.apiKeyId,
input.route,
input.requestStartedAt,
input.model ?? null,
input.promptTokens,
input.completionTokens,
input.totalTokens,
]
)
return rows[0]
}
// ─── completeUsageEvent ───────────────────────────────────────────────────────
/**
* Update the usage event with completion data and insert a message_logs row.
*
* @param {import('pg').Pool} pool
* @param {string} id
* @param {{
* requestCompletedAt: Date,
* httpStatus: number,
* model: string|null,
* promptTokens: number,
* completionTokens: number,
* totalTokens: number,
* latencyMs: number,
* requestJson: object|null,
* requestText: string|null,
* responseJson: object|null,
* responseText: string|null,
* responseContentType: string|null,
* streaming: boolean,
* partial: boolean,
* }} input
*/
async function completeUsageEvent(pool, id, input) {
// UPDATE usage_events
await pool.query(
`UPDATE usage_events SET
request_completed_at = $1,
http_status = $2,
model = $3,
prompt_tokens = $4,
completion_tokens = $5,
total_tokens = $6,
latency_ms = $7,
audit_complete = true
WHERE id = $8`,
[
input.requestCompletedAt,
input.httpStatus,
input.model ?? null,
input.promptTokens,
input.completionTokens,
input.totalTokens,
input.latencyMs,
id,
]
)
// INSERT into message_logs with delete_after
await pool.query(
`INSERT INTO message_logs
(usage_event_id, request_json, request_text, response_json, response_text,
response_content_type, streaming, partial, delete_after)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now() + ($9 || ' days')::interval)
ON CONFLICT (usage_event_id) DO UPDATE SET
request_json = EXCLUDED.request_json,
request_text = EXCLUDED.request_text,
response_json = EXCLUDED.response_json,
response_text = EXCLUDED.response_text,
response_content_type = EXCLUDED.response_content_type,
streaming = EXCLUDED.streaming,
partial = EXCLUDED.partial,
delete_after = EXCLUDED.delete_after`,
[
id,
input.requestJson ?? null,
input.requestText ?? null,
input.responseJson ?? null,
input.responseText ?? null,
input.responseContentType ?? null,
input.streaming,
input.partial,
LOG_RETENTION_DAYS,
]
)
}
// ─── failUsageEvent ───────────────────────────────────────────────────────────
/**
* Mark a usage event as failed. Does NOT insert a message_logs row.
*
* @param {import('pg').Pool} pool
* @param {string} id
* @param {{
* requestCompletedAt: Date,
* httpStatus: number|null,
* latencyMs: number,
* errorCode: string,
* }} input
*/
async function failUsageEvent(pool, id, input) {
await pool.query(
`UPDATE usage_events SET
request_completed_at = $1,
http_status = $2,
latency_ms = $3,
error_code = $4,
audit_complete = true
WHERE id = $5`,
[
input.requestCompletedAt,
input.httpStatus ?? null,
input.latencyMs,
input.errorCode,
id,
]
)
}
// ─── streamJsonlLogs ─────────────────────────────────────────────────────────
/**
* Stream usage events + message logs as JSONL to a writable stream.
*
* @param {import('pg').Pool} pool
* @param {{ apiUserId: string|null, start: Date|null, end: Date|null }} filters
* @param {import('stream').Writable} writable
*/
async function streamJsonlLogs(pool, filters, writable) {
const conditions = ["ue.audit_complete = true"]
const params = []
let idx = 1
if (filters.apiUserId != null) {
conditions.push(`ue.api_user_id = $${idx++}`)
params.push(filters.apiUserId)
}
if (filters.start != null) {
conditions.push(`ue.request_started_at >= $${idx++}`)
params.push(filters.start)
}
if (filters.end != null) {
conditions.push(`ue.request_started_at < $${idx++}`)
params.push(filters.end)
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : ""
const sql = `
SELECT
ue.id,
ue.api_user_id,
ue.api_user_name,
ue.api_key_id,
ue.route,
ue.request_started_at,
ue.request_completed_at,
ue.http_status,
ue.model,
ue.prompt_tokens,
ue.completion_tokens,
ue.total_tokens,
ue.latency_ms,
ue.error_code,
ml.request_json,
ml.request_text,
ml.response_json,
ml.response_text,
ml.streaming,
ml.partial
FROM usage_events ue
LEFT JOIN message_logs ml ON ml.usage_event_id = ue.id
${whereClause}
ORDER BY ue.request_started_at ASC
`
const result = await pool.query(sql, params)
for (const row of result.rows) {
const entry = {
id: row.id,
api_user_id: row.api_user_id,
api_user_name: row.api_user_name,
api_key_id: row.api_key_id,
route: row.route,
request_started_at: row.request_started_at,
request_completed_at: row.request_completed_at,
http_status: row.http_status,
model: row.model,
prompt_tokens: row.prompt_tokens,
completion_tokens: row.completion_tokens,
total_tokens: row.total_tokens,
latency_ms: row.latency_ms,
error_code: row.error_code,
request: row.request_json ?? (row.request_text ? tryParseJson(row.request_text) : null),
response: row.response_json ?? (row.response_text ? tryParseJson(row.response_text) : null),
streaming: row.streaming,
partial: row.partial,
}
writable.write(JSON.stringify(entry) + "\n")
}
writable.end()
}
// ─── cleanupExpiredMessageLogs ────────────────────────────────────────────────
/**
* Delete message_logs rows whose delete_after has passed.
*
* @param {import('pg').Pool} pool
* @param {Date} now
* @returns {Promise<number>} count of deleted rows
*/
async function cleanupExpiredMessageLogs(pool, now = new Date()) {
const { rowCount } = await pool.query(
"DELETE FROM message_logs WHERE delete_after <= $1",
[now]
)
return rowCount
}
// ─── Exports ─────────────────────────────────────────────────────────────────
module.exports = {
authorizeUsage,
beginUsageEvent,
completeUsageEvent,
failUsageEvent,
streamJsonlLogs,
cleanupExpiredMessageLogs,
}
+291
View File
@@ -0,0 +1,291 @@
"use strict"
const test = require("node:test")
const assert = require("node:assert/strict")
const { PassThrough } = require("stream")
const crypto = require("crypto")
const { withTestDatabase } = require("./helpers/db-test.cjs")
const { runMigrations } = require("../lib/db.cjs")
const { createApiUser } = require("../lib/api-users-store.cjs")
const {
authorizeUsage,
beginUsageEvent,
completeUsageEvent,
failUsageEvent,
streamJsonlLogs,
cleanupExpiredMessageLogs,
} = require("../lib/audit-store.cjs")
test("audit store", { timeout: 30000 }, async (t) => {
await withTestDatabase(t, async ({ pool }) => {
await runMigrations(pool)
// Helper to create a minimal api_user row directly
async function createUser(overrides = {}) {
const { user } = await createApiUser(pool, {
displayName: overrides.displayName || "Test User",
allowPre: true,
allowPost: true,
requestsPerMinute: overrides.requestsPerMinute || 60,
monthlyTokenLimit: overrides.monthlyTokenLimit || 1000000,
expiresAt: null,
})
// Get the active key id
const { rows } = await pool.query(
"SELECT id FROM api_keys WHERE api_user_id = $1 AND revoked_at IS NULL",
[user.id]
)
return { user, keyId: rows[0].id }
}
await t.test("authorizeUsage: rate limit blocks second request within same minute", async () => {
const { user, keyId } = await createUser({ requestsPerMinute: 1 })
const now = new Date()
// Seed a usage_events row to simulate an existing in-flight request this minute
const eventId = crypto.randomUUID()
await pool.query(
`INSERT INTO usage_events
(id, api_user_id, api_user_name, api_key_id, route, request_started_at, prompt_tokens, completion_tokens, total_tokens, audit_complete)
VALUES ($1, $2, $3, $4, 'pre', $5, 0, 0, 0, false)`,
[eventId, user.id, user.display_name, keyId, now]
)
const result = await authorizeUsage(pool, user, now)
assert.equal(result.ok, false, "should be denied")
assert.equal(result.reason, "rate_limited", "reason should be rate_limited")
assert.ok(typeof result.retryAfter === "number", "retryAfter should be a number")
})
await t.test("authorizeUsage: monthly token limit blocks when usage exceeds limit", async () => {
const { user, keyId } = await createUser({ monthlyTokenLimit: 100 })
const now = new Date()
// Seed a usage_events row with total_tokens exceeding the limit
const eventId = crypto.randomUUID()
await pool.query(
`INSERT INTO usage_events
(id, api_user_id, api_user_name, api_key_id, route, request_started_at, prompt_tokens, completion_tokens, total_tokens, audit_complete)
VALUES ($1, $2, $3, $4, 'pre', $5, 0, 0, 101, true)`,
[eventId, user.id, user.display_name, keyId, now]
)
const result = await authorizeUsage(pool, user, now)
assert.equal(result.ok, false, "should be denied")
assert.equal(result.reason, "token_limit_exceeded", "reason should be token_limit_exceeded")
})
await t.test("authorizeUsage: allows request when under limits", async () => {
const { user } = await createUser({ requestsPerMinute: 60, monthlyTokenLimit: 1000000 })
const now = new Date()
const result = await authorizeUsage(pool, user, now)
assert.equal(result.ok, true, "should be allowed")
})
await t.test("beginUsageEvent: creates incomplete usage event", async () => {
const { user, keyId } = await createUser()
const eventId = crypto.randomUUID()
const now = new Date()
await beginUsageEvent(pool, {
id: eventId,
apiUserId: user.id,
apiUserName: user.display_name,
apiKeyId: keyId,
route: "pre",
requestStartedAt: now,
model: null,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
})
const { rows } = await pool.query(
"SELECT * FROM usage_events WHERE id = $1",
[eventId]
)
assert.equal(rows.length, 1, "event should be created")
assert.equal(rows[0].audit_complete, false, "audit_complete should be false")
assert.equal(rows[0].api_user_id, user.id)
assert.equal(rows[0].route, "pre")
})
await t.test("completeUsageEvent: stores tokens, message_logs, and marks audit_complete = true", async () => {
const { user, keyId } = await createUser({ displayName: "Marketing Automation" })
const eventId = crypto.randomUUID()
const startedAt = new Date()
await beginUsageEvent(pool, {
id: eventId,
apiUserId: user.id,
apiUserName: user.display_name,
apiKeyId: keyId,
route: "pre",
requestStartedAt: startedAt,
model: null,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
})
const requestBody = { model: "test", messages: [{ role: "user", content: "hello" }] }
const responseBody = { choices: [{ message: { content: "world" } }] }
const completedAt = new Date()
await completeUsageEvent(pool, eventId, {
requestCompletedAt: completedAt,
httpStatus: 200,
model: "test",
promptTokens: 10,
completionTokens: 5,
totalTokens: 15,
latencyMs: 123,
requestJson: requestBody,
requestText: null,
responseJson: responseBody,
responseText: null,
responseContentType: "application/json",
streaming: false,
partial: false,
})
const { rows: eventRows } = await pool.query(
"SELECT * FROM usage_events WHERE id = $1",
[eventId]
)
assert.equal(eventRows.length, 1)
assert.equal(eventRows[0].audit_complete, true, "audit_complete should be true")
assert.equal(Number(eventRows[0].total_tokens), 15)
assert.equal(Number(eventRows[0].prompt_tokens), 10)
assert.equal(Number(eventRows[0].completion_tokens), 5)
assert.equal(eventRows[0].model, "test")
const { rows: logRows } = await pool.query(
"SELECT * FROM message_logs WHERE usage_event_id = $1",
[eventId]
)
assert.equal(logRows.length, 1, "message_logs row should be created")
assert.deepEqual(logRows[0].request_json, requestBody)
assert.deepEqual(logRows[0].response_json, responseBody)
assert.equal(logRows[0].streaming, false)
assert.ok(logRows[0].delete_after, "delete_after should be set")
})
await t.test("cleanupExpiredMessageLogs: deletes expired log rows, leaves usage_events intact", async () => {
const { user, keyId } = await createUser()
const eventId = crypto.randomUUID()
const startedAt = new Date()
await beginUsageEvent(pool, {
id: eventId,
apiUserId: user.id,
apiUserName: user.display_name,
apiKeyId: keyId,
route: "post",
requestStartedAt: startedAt,
model: "gpt-4",
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
})
// Insert message_logs row with delete_after in the past
const pastDate = new Date(Date.now() - 1000)
await pool.query(
`INSERT INTO message_logs
(usage_event_id, request_json, response_json, streaming, partial, delete_after)
VALUES ($1, $2, $3, false, false, $4)`,
[eventId, JSON.stringify({ test: true }), JSON.stringify({ ok: true }), pastDate]
)
// Mark usage_event as audit_complete
await pool.query(
"UPDATE usage_events SET audit_complete = true WHERE id = $1",
[eventId]
)
const now = new Date()
const count = await cleanupExpiredMessageLogs(pool, now)
assert.ok(count >= 1, "should have deleted at least one row")
// message_logs row should be gone
const { rows: logRows } = await pool.query(
"SELECT * FROM message_logs WHERE usage_event_id = $1",
[eventId]
)
assert.equal(logRows.length, 0, "message_logs row should be deleted")
// usage_events row should still exist
const { rows: eventRows } = await pool.query(
"SELECT * FROM usage_events WHERE id = $1",
[eventId]
)
assert.equal(eventRows.length, 1, "usage_events row should remain")
})
await t.test("streamJsonlLogs: returns valid JSONL with request and response fields", async () => {
const { user, keyId } = await createUser({ displayName: "Marketing Automation" })
const eventId = crypto.randomUUID()
const startedAt = new Date()
await beginUsageEvent(pool, {
id: eventId,
apiUserId: user.id,
apiUserName: user.display_name,
apiKeyId: keyId,
route: "pre",
requestStartedAt: startedAt,
model: null,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
})
const requestBody = { model: "test", messages: [{ role: "user", content: "hello" }] }
const responseBody = { choices: [{ message: { content: "world" } }] }
await completeUsageEvent(pool, eventId, {
requestCompletedAt: new Date(),
httpStatus: 200,
model: "test",
promptTokens: 10,
completionTokens: 5,
totalTokens: 15,
latencyMs: 100,
requestJson: requestBody,
requestText: null,
responseJson: responseBody,
responseText: null,
responseContentType: "application/json",
streaming: false,
partial: false,
})
const passThrough = new PassThrough()
const chunks = []
passThrough.on("data", (chunk) => chunks.push(chunk))
await new Promise((resolve, reject) => {
passThrough.on("end", resolve)
passThrough.on("error", reject)
streamJsonlLogs(pool, { apiUserId: user.id, start: null, end: null }, passThrough)
})
const output = Buffer.concat(chunks).toString("utf8")
const lines = output
.trim()
.split("\n")
.filter((l) => l.trim())
.map((l) => JSON.parse(l))
assert.ok(lines.length >= 1, "should have at least one line")
const entry = lines.find((l) => l.id === eventId)
assert.ok(entry, "should find our event in the output")
assert.equal(entry.api_user_name, "Marketing Automation")
assert.deepEqual(entry.request, requestBody)
assert.equal(entry.response.choices[0].message.content, "world")
assert.equal(entry.streaming, false)
})
})
})