Files

624 lines
21 KiB
JavaScript

"use strict"
const http = require("http")
const https = require("https")
const crypto = require("crypto")
const { createPool, runMigrations } = require("./lib/db.cjs")
const { authenticateApiKey } = require("./lib/api-users-store.cjs")
const { authorizeUsage, beginUsageEvent, completeUsageEvent, failUsageEvent } = require("./lib/audit-store.cjs")
const { openAiError } = require("./lib/http.cjs")
// ─── Environment ──────────────────────────────────────────────────────────────
const DATABASE_URL = process.env.DATABASE_URL
const ROUTE_KIND = process.env.HERMES_API_ROUTE_KIND
const GATEWAY_HOST = process.env.HERMES_API_GATEWAY_HOST || "0.0.0.0"
const GATEWAY_PORT = parseInt(process.env.HERMES_API_GATEWAY_PORT || "8080", 10)
const UPSTREAM_URL = process.env.HERMES_UPSTREAM_URL
const UPSTREAM_API_KEY = process.env.HERMES_UPSTREAM_API_KEY || ""
const AUDIT_MAX_BYTES = parseInt(process.env.HERMES_AUDIT_MAX_BYTES || "10485760", 10)
const DEFAULT_PROVIDER = normalizeProviderName(process.env.HERMES_DEFAULT_PROVIDER || "openai-codex")
const DEFAULT_THINKING_EFFORT = process.env.HERMES_DEFAULT_THINKING_EFFORT || "medium"
const DEFAULT_MODELS = {
anthropic: process.env.HERMES_DEFAULT_CLAUDE_MODEL || "claude-sonnet-4.6",
"openai-codex": process.env.HERMES_DEFAULT_CODEX_MODEL || "gpt-5.4",
"google-gemini-cli": process.env.HERMES_DEFAULT_GEMINI_MODEL || "gemini-3.5-flash",
}
// Validate required env vars
if (!DATABASE_URL) {
console.error("startup failed: DATABASE_URL is required")
process.exit(1)
}
if (!ROUTE_KIND || (ROUTE_KIND !== "pre" && ROUTE_KIND !== "post")) {
console.error("startup failed: HERMES_API_ROUTE_KIND must be 'pre' or 'post'")
process.exit(1)
}
if (!UPSTREAM_URL) {
console.error("startup failed: HERMES_UPSTREAM_URL is required")
process.exit(1)
}
// ─── Hop-by-hop headers ───────────────────────────────────────────────────────
const HOP_BY_HOP = new Set([
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
])
function filterHeaders(headers) {
const result = {}
for (const [key, val] of Object.entries(headers)) {
if (!HOP_BY_HOP.has(key.toLowerCase())) {
result[key] = val
}
}
return result
}
function upstreamRequestHeaders(headers) {
const result = filterHeaders(headers)
delete result.host
if (UPSTREAM_API_KEY) {
result.authorization = `Bearer ${UPSTREAM_API_KEY}`
}
return result
}
// ─── OpenAI-compatible request defaults ──────────────────────────────────────
function normalizeProviderName(provider) {
const raw = String(provider || "").trim().toLowerCase()
if (["claude", "anthropic"].includes(raw)) return "anthropic"
if (["codex", "chatgpt", "openai", "openai-codex"].includes(raw)) return "openai-codex"
if (["gemini", "google", "google-gemini-cli"].includes(raw)) return "google-gemini-cli"
return raw
}
function defaultModelForProvider(provider) {
return DEFAULT_MODELS[normalizeProviderName(provider)] || DEFAULT_MODELS[DEFAULT_PROVIDER] || DEFAULT_MODELS["openai-codex"]
}
function providerFromRequestBody(body) {
return normalizeProviderName(
body.provider ||
body.hermes_provider ||
body.metadata?.provider ||
DEFAULT_PROVIDER
)
}
function shouldNormalizeOpenAiBody(req) {
if (req.method !== "POST") return false
const path = String(req.url || "").split("?")[0]
return path === "/v1/chat/completions" || path === "/v1/responses"
}
function normalizeOpenAiRequestBody(req, requestBodyBuffer) {
if (!shouldNormalizeOpenAiBody(req) || !requestBodyBuffer.length) {
return { buffer: requestBodyBuffer, json: null, model: null }
}
let parsed
try {
parsed = JSON.parse(requestBodyBuffer.toString("utf8"))
} catch {
return { buffer: requestBodyBuffer, json: null, model: null }
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return { buffer: requestBodyBuffer, json: parsed, model: null }
}
const normalized = { ...parsed }
if (!normalized.model) {
normalized.model = defaultModelForProvider(providerFromRequestBody(normalized))
}
const explicitEffort =
normalized.reasoning_effort ||
normalized.thinking_effort ||
normalized.reasoning?.effort ||
normalized.thinking?.effort ||
DEFAULT_THINKING_EFFORT
normalized.reasoning = {
...(normalized.reasoning && typeof normalized.reasoning === "object" ? normalized.reasoning : {}),
effort: explicitEffort,
}
delete normalized.reasoning_effort
delete normalized.thinking_effort
const buffer = Buffer.from(JSON.stringify(normalized), "utf8")
return { buffer, json: normalized, model: normalized.model || null }
}
// ─── Upstream request ─────────────────────────────────────────────────────────
/**
* Forward a client request to the upstream. Returns a promise that resolves
* to the upstream IncomingMessage (response stream).
*/
function forwardToUpstream(clientReq) {
return new Promise((resolve, reject) => {
const upstreamBase = new URL(UPSTREAM_URL)
const targetUrl = new URL(clientReq.url, UPSTREAM_URL)
targetUrl.protocol = upstreamBase.protocol
targetUrl.hostname = upstreamBase.hostname
targetUrl.port = upstreamBase.port
const forwardHeaders = upstreamRequestHeaders(clientReq.headers)
const options = {
hostname: targetUrl.hostname,
port: targetUrl.port || (targetUrl.protocol === "https:" ? 443 : 80),
path: targetUrl.pathname + targetUrl.search,
method: clientReq.method,
headers: forwardHeaders,
}
const transport = targetUrl.protocol === "https:" ? https : http
const upstreamReq = transport.request(options, (upstreamRes) => {
resolve(upstreamRes)
})
upstreamReq.on("error", reject)
// Pipe client request body to upstream
clientReq.pipe(upstreamReq)
})
}
// ─── Token/model extraction ───────────────────────────────────────────────────
function extractUsage(text) {
try {
const parsed = JSON.parse(text)
const usage = parsed.usage || {}
return {
model: parsed.model || null,
promptTokens: parseInt(usage.prompt_tokens, 10) || 0,
completionTokens: parseInt(usage.completion_tokens, 10) || 0,
totalTokens: parseInt(usage.total_tokens, 10) || 0,
}
} catch {
return { model: null, promptTokens: 0, completionTokens: 0, totalTokens: 0 }
}
}
/**
* Extract model/usage from SSE stream text. Scans data: lines for JSON with
* `usage` field (typically the last chunk before [DONE]).
*/
function extractUsageFromSse(text) {
let model = null
let promptTokens = 0
let completionTokens = 0
let totalTokens = 0
const lines = text.split("\n")
for (const line of lines) {
if (!line.startsWith("data:")) continue
const payload = line.slice(5).trim()
if (payload === "[DONE]") continue
try {
const parsed = JSON.parse(payload)
if (parsed.model) model = parsed.model
if (parsed.usage) {
promptTokens = parseInt(parsed.usage.prompt_tokens, 10) || 0
completionTokens = parseInt(parsed.usage.completion_tokens, 10) || 0
totalTokens = parseInt(parsed.usage.total_tokens, 10) || 0
}
} catch {
// ignore malformed SSE chunks
}
}
return { model, promptTokens, completionTokens, totalTokens }
}
// ─── Request handler ──────────────────────────────────────────────────────────
async function handleRequest(req, res, pool) {
// Health check — unauthenticated
if (req.method === "GET" && req.url === "/health") {
try {
await pool.query("SELECT 1")
res.writeHead(200, { "Content-Type": "application/json" })
res.end(JSON.stringify({ ok: true, route: ROUTE_KIND }))
} catch (err) {
res.writeHead(503, { "Content-Type": "application/json" })
res.end(JSON.stringify({ ok: false, error: "Database unavailable" }))
}
return
}
// ─── Auth ────────────────────────────────────────────────────────────────
// Extract Bearer token
const authHeader = req.headers["authorization"] || ""
const match = authHeader.match(/^Bearer\s+(.+)$/i)
if (!match) {
openAiError(res, 401, "Authentication required", "authentication_required")
return
}
const token = match[1].trim()
// Authenticate
let authResult
try {
authResult = await authenticateApiKey(pool, token, ROUTE_KIND)
} catch (err) {
openAiError(res, 500, "Internal server error", "internal_error")
return
}
if (!authResult.ok) {
switch (authResult.reason) {
case "invalid":
openAiError(res, 401, "Invalid API key", "invalid_api_key")
return
case "revoked":
openAiError(res, 410, "API key has been revoked", "key_revoked")
return
case "expired":
openAiError(res, 410, "API key has expired", "key_expired")
return
case "forbidden":
openAiError(
res, 403,
`API key does not permit ${ROUTE_KIND}-Hermes access`,
"permission_denied"
)
return
default:
openAiError(res, 401, "Authentication failed", "authentication_failed")
return
}
}
const { user, keyId } = authResult
// ─── Usage authorization ─────────────────────────────────────────────────
let usageAuth
try {
usageAuth = await authorizeUsage(pool, user)
} catch (err) {
openAiError(res, 500, "Internal server error", "internal_error")
return
}
if (!usageAuth.ok) {
switch (usageAuth.reason) {
case "rate_limited":
openAiError(res, 429, "Rate limit exceeded", "rate_limit_exceeded")
return
case "token_limit_exceeded":
openAiError(res, 429, "Monthly token limit exceeded", "token_limit_exceeded")
return
default:
openAiError(res, 429, "Usage limit exceeded", "usage_limit_exceeded")
return
}
}
// ─── Check request Content-Length against audit max ──────────────────────
const contentLengthHeader = req.headers["content-length"]
if (contentLengthHeader) {
const contentLength = parseInt(contentLengthHeader, 10)
if (!isNaN(contentLength) && contentLength > AUDIT_MAX_BYTES) {
res.writeHead(413, { "Content-Type": "application/json" })
res.end(JSON.stringify({ error: { message: "Request body too large", type: "request_too_large", code: "request_too_large" } }))
return
}
}
// ─── Begin usage event ───────────────────────────────────────────────────
const eventId = crypto.randomUUID()
const requestStartedAt = new Date()
// Try to read request body for logging (collect it as a side-channel)
// We need to buffer the request body to forward AND log it.
// Collect request body into a buffer, then pipe that buffer to upstream.
let requestBodyBuffer = Buffer.alloc(0)
let requestBodyExceeded = false
try {
await new Promise((resolve, reject) => {
const chunks = []
let total = 0
req.on("data", (chunk) => {
total += chunk.length
if (total > AUDIT_MAX_BYTES) {
requestBodyExceeded = true
}
chunks.push(chunk)
})
req.on("end", () => {
requestBodyBuffer = Buffer.concat(chunks)
resolve()
})
req.on("error", reject)
})
} catch (err) {
openAiError(res, 400, "Failed to read request body", "bad_request")
return
}
const normalizedRequest = normalizeOpenAiRequestBody(req, requestBodyBuffer)
requestBodyBuffer = normalizedRequest.buffer
// Extract model from request JSON if possible
let requestModel = null
let requestJson = null
if (normalizedRequest.json && !requestBodyExceeded) {
requestModel = normalizedRequest.model
requestJson = normalizedRequest.json
} else {
try {
const parsed = JSON.parse(requestBodyBuffer.toString("utf8"))
requestModel = parsed.model || null
requestJson = requestBodyExceeded ? null : parsed
} catch {
// non-JSON body
}
}
await beginUsageEvent(pool, {
id: eventId,
apiUserId: user.id,
apiUserName: user.display_name,
apiKeyId: keyId,
route: ROUTE_KIND,
requestStartedAt,
model: requestModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
})
// ─── Forward to upstream ─────────────────────────────────────────────────
// Build upstream request manually (we already buffered the body)
let upstreamRes
try {
upstreamRes = await new Promise((resolve, reject) => {
const upstreamBase = new URL(UPSTREAM_URL)
const targetUrl = new URL(req.url, UPSTREAM_URL)
targetUrl.protocol = upstreamBase.protocol
targetUrl.hostname = upstreamBase.hostname
targetUrl.port = upstreamBase.port
const forwardHeaders = upstreamRequestHeaders(req.headers)
// Update content-length if we buffered the body
if (requestBodyBuffer.length > 0) {
forwardHeaders["content-length"] = String(requestBodyBuffer.length)
}
const options = {
hostname: targetUrl.hostname,
port: targetUrl.port || (targetUrl.protocol === "https:" ? 443 : 80),
path: targetUrl.pathname + targetUrl.search,
method: req.method,
headers: forwardHeaders,
}
const transport = targetUrl.protocol === "https:" ? https : http
const upstreamReq = transport.request(options, (uRes) => resolve(uRes))
upstreamReq.on("error", reject)
if (requestBodyBuffer.length > 0) {
upstreamReq.write(requestBodyBuffer)
}
upstreamReq.end()
})
} catch (err) {
// Upstream connection failed
const requestCompletedAt = new Date()
const latencyMs = requestCompletedAt.getTime() - requestStartedAt.getTime()
await failUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: null,
latencyMs,
errorCode: "upstream_error",
}).catch(() => {})
openAiError(res, 502, "Upstream service unavailable", "upstream_error")
return
}
// ─── Forward upstream response ───────────────────────────────────────────
const upstreamStatus = upstreamRes.statusCode
const upstreamHeaders = filterHeaders(upstreamRes.headers)
const responseContentType = upstreamHeaders["content-type"] || ""
const isStreaming = responseContentType.includes("text/event-stream")
// Forward response headers to client
res.writeHead(upstreamStatus, upstreamHeaders)
if (isStreaming) {
// SSE: pipe chunks immediately, buffer for audit up to AUDIT_MAX_BYTES
let responseBuffer = ""
let auditSizeExceeded = false
let partial = false
let completed = false
res.on("close", () => {
if (!completed) partial = true
})
upstreamRes.on("data", (chunk) => {
// Forward immediately
res.write(chunk)
// Buffer for audit
if (!auditSizeExceeded) {
const newContent = responseBuffer + chunk.toString("utf8")
if (Buffer.byteLength(newContent, "utf8") > AUDIT_MAX_BYTES) {
auditSizeExceeded = true
responseBuffer = newContent.slice(0, AUDIT_MAX_BYTES)
} else {
responseBuffer = newContent
}
}
})
upstreamRes.on("end", async () => {
completed = true
res.end()
const requestCompletedAt = new Date()
const latencyMs = requestCompletedAt.getTime() - requestStartedAt.getTime()
const { model, promptTokens, completionTokens, totalTokens } = extractUsageFromSse(responseBuffer)
await completeUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: upstreamStatus,
model: model || requestModel,
promptTokens,
completionTokens,
totalTokens,
latencyMs,
requestJson,
requestText: requestJson ? null : (requestBodyExceeded ? null : requestBodyBuffer.toString("utf8")),
responseJson: null,
responseText: auditSizeExceeded ? null : responseBuffer,
responseContentType,
streaming: true,
partial,
}).catch(() => {})
})
upstreamRes.on("error", async (err) => {
completed = true
res.destroy()
const requestCompletedAt = new Date()
const latencyMs = requestCompletedAt.getTime() - requestStartedAt.getTime()
await failUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: null,
latencyMs,
errorCode: "upstream_stream_error",
}).catch(() => {})
})
} else {
// Non-streaming: collect full body
const chunks = []
let total = 0
let bodyExceeded = false
res.on("close", () => { upstreamRes.destroy() })
upstreamRes.on("data", (chunk) => {
total += chunk.length
if (total > AUDIT_MAX_BYTES) {
bodyExceeded = true
}
chunks.push(chunk)
})
upstreamRes.on("end", async () => {
const responseBodyBuffer = Buffer.concat(chunks)
const requestCompletedAt = new Date()
const latencyMs = requestCompletedAt.getTime() - requestStartedAt.getTime()
if (bodyExceeded) {
// Body exceeded audit max — we already sent the headers (writeHead above)
// The spec says return 413 if headers not sent, but we already called writeHead.
// We must still complete the response. Log as audit_size_exceeded, forward what we have.
res.end(responseBodyBuffer)
await completeUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: upstreamStatus,
model: requestModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
latencyMs,
requestJson,
requestText: null,
responseJson: null,
responseText: null,
responseContentType,
streaming: false,
partial: false,
}).catch(() => {})
return
}
// Forward body to client
res.end(responseBodyBuffer)
// Extract usage
const responseText = responseBodyBuffer.toString("utf8")
let responseJson = null
const { model, promptTokens, completionTokens, totalTokens } = extractUsage(responseText)
try { responseJson = JSON.parse(responseText) } catch { /* not JSON */ }
await completeUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: upstreamStatus,
model: model || requestModel,
promptTokens,
completionTokens,
totalTokens,
latencyMs,
requestJson,
requestText: requestJson ? null : (requestBodyExceeded ? null : requestBodyBuffer.toString("utf8")),
responseJson,
responseText: responseJson ? null : responseText,
responseContentType,
streaming: false,
partial: false,
}).catch(() => {})
})
upstreamRes.on("error", async (err) => {
res.destroy()
const requestCompletedAt = new Date()
const latencyMs = requestCompletedAt.getTime() - requestStartedAt.getTime()
await failUsageEvent(pool, eventId, {
requestCompletedAt,
httpStatus: null,
latencyMs,
errorCode: "upstream_response_error",
}).catch(() => {})
})
}
}
// ─── Server ───────────────────────────────────────────────────────────────────
async function main() {
const pool = createPool(DATABASE_URL)
await runMigrations(pool)
const server = http.createServer((req, res) => {
handleRequest(req, res, pool).catch((err) => {
console.error("unhandled request error:", err.message)
if (!res.headersSent) {
res.writeHead(500, { "Content-Type": "application/json" })
res.end(JSON.stringify({ error: { message: "Internal server error", type: "internal_error", code: "internal_error" } }))
}
})
})
server.listen(GATEWAY_PORT, GATEWAY_HOST, () => {
console.log(`Hermes API Gateway (${ROUTE_KIND}) — http://${GATEWAY_HOST}:${GATEWAY_PORT}/`)
})
}
main().catch((err) => {
console.error("startup failed:", err.message)
process.exit(1)
})