612 lines
21 KiB
JavaScript
612 lines
21 KiB
JavaScript
"use strict"
|
|
|
|
const test = require("node:test")
|
|
const assert = require("node:assert/strict")
|
|
const http = require("http")
|
|
const { spawn } = require("child_process")
|
|
const path = require("path")
|
|
const { withTestDatabase } = require("./helpers/db-test.cjs")
|
|
const { runMigrations } = require("../lib/db.cjs")
|
|
const { createApiUser, revokeApiUser } = require("../lib/api-users-store.cjs")
|
|
const { beginUsageEvent } = require("../lib/audit-store.cjs")
|
|
const crypto = require("crypto")
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Start a fake upstream HTTP server. Returns { server, url, close }.
|
|
* @param {function} handler - (req, res) => void
|
|
*/
|
|
function startFakeUpstream(handler) {
|
|
return new Promise((resolve) => {
|
|
const server = http.createServer(handler)
|
|
server.listen(0, "127.0.0.1", () => {
|
|
const { port } = server.address()
|
|
resolve({
|
|
server,
|
|
url: `http://127.0.0.1:${port}`,
|
|
close: () => new Promise((res) => server.close(res)),
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Start the api-gateway.cjs subprocess. Waits until /health responds ok.
|
|
* Returns { port, close }.
|
|
*/
|
|
function startGateway(options) {
|
|
return new Promise((resolve, reject) => {
|
|
const {
|
|
databaseUrl,
|
|
routeKind = "pre",
|
|
upstreamUrl,
|
|
port = 0, // 0 means we pick a random port via env trick — we need to pick one
|
|
extraEnv = {},
|
|
} = options
|
|
|
|
// Pick a random high port
|
|
const gatewayPort = port || (10000 + Math.floor(Math.random() * 50000))
|
|
const gatewayHost = "127.0.0.1"
|
|
|
|
const env = {
|
|
...process.env,
|
|
DATABASE_URL: databaseUrl,
|
|
HERMES_API_ROUTE_KIND: routeKind,
|
|
HERMES_API_GATEWAY_HOST: gatewayHost,
|
|
HERMES_API_GATEWAY_PORT: String(gatewayPort),
|
|
HERMES_UPSTREAM_URL: upstreamUrl,
|
|
HERMES_LOG_RETENTION_DAYS: "1",
|
|
...extraEnv,
|
|
}
|
|
|
|
const gatewayPath = path.join(__dirname, "..", "api-gateway.cjs")
|
|
const child = spawn(process.execPath, [gatewayPath], { env, stdio: "pipe" })
|
|
|
|
let stderr = ""
|
|
child.stderr.on("data", (chunk) => { stderr += chunk.toString() })
|
|
child.stdout.on("data", () => {}) // drain
|
|
|
|
// Poll /health until the server is up
|
|
const deadline = Date.now() + 10000
|
|
let killed = false
|
|
|
|
function poll() {
|
|
if (killed) return
|
|
if (Date.now() > deadline) {
|
|
child.kill()
|
|
return reject(new Error(`Gateway did not start in time. stderr: ${stderr}`))
|
|
}
|
|
const req = http.request(
|
|
{ hostname: gatewayHost, port: gatewayPort, path: "/health", method: "GET" },
|
|
(res) => {
|
|
let body = ""
|
|
res.on("data", (c) => { body += c })
|
|
res.on("end", () => {
|
|
try {
|
|
const parsed = JSON.parse(body)
|
|
if (parsed.ok) {
|
|
resolve({
|
|
port: gatewayPort,
|
|
host: gatewayHost,
|
|
close: () => new Promise((res) => {
|
|
killed = true
|
|
child.kill()
|
|
child.on("exit", res)
|
|
}),
|
|
})
|
|
} else {
|
|
setTimeout(poll, 200)
|
|
}
|
|
} catch {
|
|
setTimeout(poll, 200)
|
|
}
|
|
})
|
|
}
|
|
)
|
|
req.on("error", () => setTimeout(poll, 200))
|
|
req.end()
|
|
}
|
|
|
|
child.on("error", reject)
|
|
child.on("exit", (code) => {
|
|
if (!killed && code !== 0) {
|
|
reject(new Error(`Gateway exited with code ${code}. stderr: ${stderr}`))
|
|
}
|
|
})
|
|
|
|
setTimeout(poll, 300)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Make an HTTP request to the gateway and return { status, body }.
|
|
*/
|
|
function gatewayRequest(options) {
|
|
return new Promise((resolve, reject) => {
|
|
const { host, port, path: reqPath = "/v1/chat/completions", method = "GET", headers = {}, body } = options
|
|
const reqOpts = { hostname: host, port, path: reqPath, method, headers }
|
|
const req = http.request(reqOpts, (res) => {
|
|
const chunks = []
|
|
res.on("data", (c) => chunks.push(c))
|
|
res.on("end", () => {
|
|
const raw = Buffer.concat(chunks).toString("utf8")
|
|
let parsed
|
|
try { parsed = JSON.parse(raw) } catch { parsed = raw }
|
|
resolve({ status: res.statusCode, body: parsed, headers: res.headers, rawBody: raw })
|
|
})
|
|
res.on("error", reject)
|
|
})
|
|
req.on("error", reject)
|
|
if (body) req.write(typeof body === "string" ? body : JSON.stringify(body))
|
|
req.end()
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Make a streaming HTTP request to the gateway and collect all chunks.
|
|
*/
|
|
function gatewayStreamRequest(options) {
|
|
return new Promise((resolve, reject) => {
|
|
const { host, port, path: reqPath = "/v1/chat/completions", method = "POST", headers = {}, body } = options
|
|
const reqOpts = { hostname: host, port, path: reqPath, method, headers }
|
|
const req = http.request(reqOpts, (res) => {
|
|
const chunks = []
|
|
res.on("data", (c) => chunks.push(c))
|
|
res.on("end", () => {
|
|
resolve({ status: res.statusCode, chunks, rawBody: Buffer.concat(chunks).toString("utf8"), headers: res.headers })
|
|
})
|
|
res.on("error", reject)
|
|
})
|
|
req.on("error", reject)
|
|
if (body) req.write(typeof body === "string" ? body : JSON.stringify(body))
|
|
req.end()
|
|
})
|
|
}
|
|
|
|
// ─── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
test("api-gateway integration", { timeout: 60000 }, async (t) => {
|
|
await withTestDatabase(t, async ({ pool, schemaName }) => {
|
|
await runMigrations(pool)
|
|
|
|
// Create test users/keys for the tests
|
|
const { user: preUser, plaintextKey: preKey } = await createApiUser(pool, {
|
|
displayName: "Pre-only user",
|
|
allowPre: true,
|
|
allowPost: false,
|
|
requestsPerMinute: 1000,
|
|
monthlyTokenLimit: 10000000,
|
|
expiresAt: null,
|
|
})
|
|
|
|
const { user: postUser, plaintextKey: postKey } = await createApiUser(pool, {
|
|
displayName: "Post-only user",
|
|
allowPre: false,
|
|
allowPost: true,
|
|
requestsPerMinute: 1000,
|
|
monthlyTokenLimit: 10000000,
|
|
expiresAt: null,
|
|
})
|
|
|
|
const { user: rateLimitedUser, plaintextKey: rateLimitedKey } = await createApiUser(pool, {
|
|
displayName: "Rate-limited user",
|
|
allowPre: true,
|
|
allowPost: true,
|
|
requestsPerMinute: 1, // 1 req/min so we can trigger rate limit
|
|
monthlyTokenLimit: 10000000,
|
|
expiresAt: null,
|
|
})
|
|
|
|
// Seed one existing event to hit rate limit
|
|
const { rows: keyRows } = await pool.query(
|
|
"SELECT id FROM api_keys WHERE api_user_id = $1 AND revoked_at IS NULL",
|
|
[rateLimitedUser.id]
|
|
)
|
|
const rateLimitedKeyId = keyRows[0].id
|
|
await beginUsageEvent(pool, {
|
|
id: crypto.randomUUID(),
|
|
apiUserId: rateLimitedUser.id,
|
|
apiUserName: rateLimitedUser.display_name,
|
|
apiKeyId: rateLimitedKeyId,
|
|
route: "pre",
|
|
requestStartedAt: new Date(),
|
|
model: null,
|
|
promptTokens: 0,
|
|
completionTokens: 0,
|
|
totalTokens: 0,
|
|
})
|
|
|
|
// Create and revoke a user to get a revoked key
|
|
const { user: toRevokeUser, plaintextKey: revokedKey } = await createApiUser(pool, {
|
|
displayName: "Revoked user",
|
|
allowPre: true,
|
|
allowPost: true,
|
|
requestsPerMinute: 1000,
|
|
monthlyTokenLimit: 10000000,
|
|
expiresAt: null,
|
|
})
|
|
await revokeApiUser(pool, toRevokeUser.id)
|
|
|
|
const gatewayDatabaseUrl = new URL(process.env.TEST_DATABASE_URL)
|
|
gatewayDatabaseUrl.searchParams.set("options", `-c search_path=${schemaName}`)
|
|
const databaseUrl = gatewayDatabaseUrl.toString()
|
|
|
|
// Start a fake upstream that returns a simple JSON response
|
|
const jsonUpstream = await startFakeUpstream((req, res) => {
|
|
let body = ""
|
|
req.on("data", (c) => { body += c })
|
|
req.on("end", () => {
|
|
res.writeHead(200, { "Content-Type": "application/json" })
|
|
res.end(JSON.stringify({
|
|
id: "chatcmpl-test",
|
|
model: "gpt-4o",
|
|
choices: [{ message: { role: "assistant", content: "Hello!" } }],
|
|
usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 },
|
|
}))
|
|
})
|
|
})
|
|
|
|
// Start the pre-gateway
|
|
const gw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "pre",
|
|
upstreamUrl: jsonUpstream.url,
|
|
})
|
|
|
|
// ─── Test 1: Missing bearer token → 401 ───────────────────────────────────
|
|
await t.test("missing bearer token → 401 with OpenAI error shape", async () => {
|
|
const { status, body } = await gatewayRequest({
|
|
host: gw.host,
|
|
port: gw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 401)
|
|
assert.ok(body.error, "should have error field")
|
|
assert.ok(body.error.message, "should have error.message")
|
|
assert.ok(body.error.type, "should have error.type")
|
|
assert.ok(body.error.code, "should have error.code")
|
|
assert.equal(body.error.code, "authentication_required")
|
|
})
|
|
|
|
// ─── Test 2: Invalid bearer token → 401 ──────────────────────────────────
|
|
await t.test("invalid bearer token → 401 with code: invalid_api_key", async () => {
|
|
const { status, body } = await gatewayRequest({
|
|
host: gw.host,
|
|
port: gw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Bearer hms_notarealkey1234567890",
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 401)
|
|
assert.deepEqual(body, {
|
|
error: {
|
|
message: "Invalid API key",
|
|
type: "invalid_api_key",
|
|
code: "invalid_api_key",
|
|
},
|
|
})
|
|
})
|
|
|
|
// ─── Test 3: Valid pre-only key on post gateway → 403 ────────────────────
|
|
await t.test("pre-only key on post gateway → 403 with code: permission_denied", async () => {
|
|
// Start a post-gateway
|
|
const postGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "post",
|
|
upstreamUrl: jsonUpstream.url,
|
|
})
|
|
try {
|
|
const { status, body } = await gatewayRequest({
|
|
host: postGw.host,
|
|
port: postGw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 403)
|
|
assert.deepEqual(body, {
|
|
error: {
|
|
message: "API key does not permit post-Hermes access",
|
|
type: "permission_denied",
|
|
code: "permission_denied",
|
|
},
|
|
})
|
|
} finally {
|
|
await postGw.close()
|
|
}
|
|
})
|
|
|
|
// ─── Test 4: Revoked key → 410 ────────────────────────────────────────────
|
|
await t.test("revoked key → 410 with code: key_revoked", async () => {
|
|
const { status, body } = await gatewayRequest({
|
|
host: gw.host,
|
|
port: gw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${revokedKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 410)
|
|
assert.deepEqual(body, {
|
|
error: {
|
|
message: "API key has been revoked",
|
|
type: "key_revoked",
|
|
code: "key_revoked",
|
|
},
|
|
})
|
|
})
|
|
|
|
// ─── Test 5: Rate-limited user → 429 ─────────────────────────────────────
|
|
await t.test("rate-limited user → 429 with code: rate_limit_exceeded", async () => {
|
|
const { status, body } = await gatewayRequest({
|
|
host: gw.host,
|
|
port: gw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${rateLimitedKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 429)
|
|
assert.deepEqual(body, {
|
|
error: {
|
|
message: "Rate limit exceeded",
|
|
type: "rate_limit_exceeded",
|
|
code: "rate_limit_exceeded",
|
|
},
|
|
})
|
|
})
|
|
|
|
// ─── Test 6: Non-streaming response forwarded ─────────────────────────────
|
|
await t.test("non-streaming response: upstream returns 200 JSON, gateway forwards it", async () => {
|
|
const { status, body } = await gatewayRequest({
|
|
host: gw.host,
|
|
port: gw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [{ role: "user", content: "hi" }] }),
|
|
})
|
|
assert.equal(status, 200)
|
|
assert.equal(body.model, "gpt-4o")
|
|
assert.ok(body.choices, "should have choices")
|
|
assert.ok(body.usage, "should have usage")
|
|
})
|
|
|
|
await t.test("OpenAI-compatible requests get default model and medium reasoning effort", async () => {
|
|
let receivedBody = null
|
|
const captureUpstream = await startFakeUpstream((req, res) => {
|
|
let raw = ""
|
|
req.setEncoding("utf8")
|
|
req.on("data", (c) => { raw += c })
|
|
req.on("end", () => {
|
|
receivedBody = JSON.parse(raw)
|
|
res.writeHead(200, { "Content-Type": "application/json" })
|
|
res.end(JSON.stringify({ model: receivedBody.model, usage: {} }))
|
|
})
|
|
})
|
|
const defaultsGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "pre",
|
|
upstreamUrl: captureUpstream.url,
|
|
extraEnv: {
|
|
HERMES_DEFAULT_PROVIDER: "openai-codex",
|
|
HERMES_DEFAULT_CODEX_MODEL: "gpt-5.4",
|
|
},
|
|
})
|
|
|
|
try {
|
|
const { status } = await gatewayRequest({
|
|
host: defaultsGw.host,
|
|
port: defaultsGw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({ messages: [{ role: "user", content: "hi" }] }),
|
|
})
|
|
assert.equal(status, 200)
|
|
assert.equal(receivedBody.model, "gpt-5.4")
|
|
assert.deepEqual(receivedBody.reasoning, { effort: "medium" })
|
|
} finally {
|
|
await defaultsGw.close()
|
|
await captureUpstream.close()
|
|
}
|
|
})
|
|
|
|
await t.test("request model and thinking_effort override gateway defaults", async () => {
|
|
let receivedBody = null
|
|
const captureUpstream = await startFakeUpstream((req, res) => {
|
|
let raw = ""
|
|
req.setEncoding("utf8")
|
|
req.on("data", (c) => { raw += c })
|
|
req.on("end", () => {
|
|
receivedBody = JSON.parse(raw)
|
|
res.writeHead(200, { "Content-Type": "application/json" })
|
|
res.end(JSON.stringify({ model: receivedBody.model, usage: {} }))
|
|
})
|
|
})
|
|
const defaultsGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "pre",
|
|
upstreamUrl: captureUpstream.url,
|
|
})
|
|
|
|
try {
|
|
const { status } = await gatewayRequest({
|
|
host: defaultsGw.host,
|
|
port: defaultsGw.port,
|
|
path: "/v1/responses",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({
|
|
model: "claude-sonnet-4.6",
|
|
thinking_effort: "high",
|
|
input: "hi",
|
|
}),
|
|
})
|
|
assert.equal(status, 200)
|
|
assert.equal(receivedBody.model, "claude-sonnet-4.6")
|
|
assert.deepEqual(receivedBody.reasoning, { effort: "high" })
|
|
assert.equal(receivedBody.thinking_effort, undefined)
|
|
} finally {
|
|
await defaultsGw.close()
|
|
await captureUpstream.close()
|
|
}
|
|
})
|
|
|
|
await t.test("configured upstream API key replaces the client API key", async () => {
|
|
let receivedAuthorization = null
|
|
const authenticatedUpstream = await startFakeUpstream((req, res) => {
|
|
receivedAuthorization = req.headers.authorization
|
|
req.resume()
|
|
req.on("end", () => {
|
|
res.writeHead(200, { "Content-Type": "application/json" })
|
|
res.end(JSON.stringify({ model: "test", usage: {} }))
|
|
})
|
|
})
|
|
const internalKey = "internal-hermes-api-key"
|
|
const postGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "post",
|
|
upstreamUrl: authenticatedUpstream.url,
|
|
extraEnv: { HERMES_UPSTREAM_API_KEY: internalKey },
|
|
})
|
|
|
|
try {
|
|
const { status } = await gatewayRequest({
|
|
host: postGw.host,
|
|
port: postGw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${postKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "test", messages: [] }),
|
|
})
|
|
assert.equal(status, 200)
|
|
assert.equal(receivedAuthorization, `Bearer ${internalKey}`)
|
|
} finally {
|
|
await postGw.close()
|
|
await authenticatedUpstream.close()
|
|
}
|
|
})
|
|
|
|
// ─── Test 7: SSE streaming forwarded ─────────────────────────────────────
|
|
await t.test("SSE streaming: upstream streams text/event-stream, gateway forwards chunks", async () => {
|
|
// Start a streaming upstream
|
|
const streamUpstream = await startFakeUpstream((req, res) => {
|
|
res.writeHead(200, {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
})
|
|
const chunks = [
|
|
{ id: "chatcmpl-1", model: "gpt-4o", choices: [{ delta: { role: "assistant" } }] },
|
|
{ id: "chatcmpl-1", model: "gpt-4o", choices: [{ delta: { content: "Hello" } }] },
|
|
{ id: "chatcmpl-1", model: "gpt-4o", choices: [{ delta: { content: "!" } }] },
|
|
{ id: "chatcmpl-1", model: "gpt-4o", choices: [{ finish_reason: "stop", delta: {} }], usage: { prompt_tokens: 5, completion_tokens: 2, total_tokens: 7 } },
|
|
]
|
|
let i = 0
|
|
function sendNext() {
|
|
if (i >= chunks.length) {
|
|
res.write("data: [DONE]\n\n")
|
|
res.end()
|
|
return
|
|
}
|
|
res.write(`data: ${JSON.stringify(chunks[i++])}\n\n`)
|
|
setTimeout(sendNext, 10)
|
|
}
|
|
sendNext()
|
|
})
|
|
|
|
const streamGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "pre",
|
|
upstreamUrl: streamUpstream.url,
|
|
})
|
|
|
|
try {
|
|
const { status, rawBody, headers } = await gatewayStreamRequest({
|
|
host: streamGw.host,
|
|
port: streamGw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [{ role: "user", content: "hi" }], stream: true }),
|
|
})
|
|
|
|
assert.equal(status, 200)
|
|
assert.ok(headers["content-type"] && headers["content-type"].includes("text/event-stream"),
|
|
"should forward text/event-stream content type")
|
|
assert.ok(rawBody.includes("data:"), "should have SSE data lines")
|
|
assert.ok(rawBody.includes("[DONE]"), "should include [DONE] sentinel")
|
|
} finally {
|
|
await streamGw.close()
|
|
await streamUpstream.close()
|
|
}
|
|
})
|
|
|
|
// ─── Test 8: Upstream connection refused → 502 ────────────────────────────
|
|
await t.test("upstream failure (connection refused) → 502", async () => {
|
|
const badUpstreamUrl = "http://127.0.0.1:1" // port 1 should be refused
|
|
const badGw = await startGateway({
|
|
databaseUrl,
|
|
routeKind: "pre",
|
|
upstreamUrl: badUpstreamUrl,
|
|
})
|
|
try {
|
|
const { status, body } = await gatewayRequest({
|
|
host: badGw.host,
|
|
port: badGw.port,
|
|
path: "/v1/chat/completions",
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${preKey}`,
|
|
},
|
|
body: JSON.stringify({ model: "gpt-4o", messages: [] }),
|
|
})
|
|
assert.equal(status, 502)
|
|
assert.ok(body.error, "should have error field")
|
|
assert.equal(body.error.code, "upstream_error")
|
|
} finally {
|
|
await badGw.close()
|
|
}
|
|
})
|
|
|
|
// ─── Cleanup ──────────────────────────────────────────────────────────────
|
|
await gw.close()
|
|
await jsonUpstream.close()
|
|
})
|
|
})
|