fix(server): finalize dangling tool calls when persisting agent chat messages (#21276)
## Problem
Interrupting an AI chat turn mid tool-call batch permanently bricks the
thread. Every subsequent message fails with:
> Tool results are missing for tool calls toolu_…, toolu_…
## Root cause
When the model fires a parallel batch of tool calls, it streams all the
calls first, then results come back one by one. If the stream is aborted
(user hits stop, credit cutoff, etc.) after only some have resolved, the
AI SDK's `onFinish` still fires with the partial assistant message —
including tool parts left in `input-available` state (a tool call with
no result).
`addMessage` persists that message verbatim. On the next turn the
history is rebuilt and `streamText` validates it: every `tool-call` must
be cleared by a `tool-result` before the next user message, or it throws
`MissingToolResultsError` (`ai/dist`, the `MissingToolResultsError`
check). The orphaned calls are now in the DB, so the thread fails on
every turn from then on.
## Fix
Enforce the invariant at the single write chokepoint. Every chat message
is persisted through `AgentChatService.addMessage`, so
`finalizeDanglingToolParts` runs there once: any tool part still in
`input-available` is rewritten to `output-error` ("Tool execution was
interrupted.") before mapping to DB rows.
`output-error` converts to a real `tool-result`, so the persisted turn
is always self-consistent and the next request is valid. Interrupted
calls are kept (not dropped) and surfaced as errored rather than
perpetually "running" — honest, since a partially-executed call may have
committed side effects the model should be able to reconcile.
One guard at one point covers every abort source — no read-side
patching, no migration, no schema change.
## Caching impact
None on the happy path. A completed turn has no `input-available` parts,
so the helper is a no-op and the persisted bytes (and therefore the
cached prefix) are identical to before. For an interrupted turn, the
finalized content is deterministic and written once, so it caches
cleanly on the following request and stays stable across later turns —
there is no scenario where this invalidates an existing cache entry. Net
effect: turns a hard failure into a normally-cached continuation.
## Testing
- New unit test covering finalize / no-op cases (7 cases, passing)
- `oxlint --type-aware` + `oxfmt` clean on changed files
This commit is contained in:
+130
@@ -0,0 +1,130 @@
|
||||
import { convertToModelMessages, type UIMessage } from 'ai';
|
||||
import { type ExtendedUIMessagePart } from 'twenty-shared/ai';
|
||||
|
||||
import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity';
|
||||
import { finalizeDanglingToolParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/finalize-dangling-tool-parts.util';
|
||||
import { mapDBPartToUIMessagePart } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartToUIMessagePart';
|
||||
import { mapUIMessagePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts';
|
||||
|
||||
const toolPart = (
|
||||
state: string,
|
||||
overrides: Record<string, unknown>,
|
||||
): ExtendedUIMessagePart =>
|
||||
({
|
||||
type: 'tool-execute_tool',
|
||||
input: { name: 'nav item' },
|
||||
state,
|
||||
...overrides,
|
||||
}) as unknown as ExtendedUIMessagePart;
|
||||
|
||||
const persistAndReload = (
|
||||
parts: ExtendedUIMessagePart[],
|
||||
): ExtendedUIMessagePart[] =>
|
||||
mapUIMessagePartsToDBParts(parts, 'message-1', 'workspace-1')
|
||||
.map((dbPart) => mapDBPartToUIMessagePart(dbPart as AgentMessagePartEntity))
|
||||
.filter((part): part is ExtendedUIMessagePart => part !== null);
|
||||
|
||||
const buildThread = (assistantParts: ExtendedUIMessagePart[]): UIMessage[] =>
|
||||
[
|
||||
{
|
||||
id: 'u1',
|
||||
role: 'user',
|
||||
parts: [{ type: 'text', text: 'create 3 items' }],
|
||||
},
|
||||
{ id: 'a1', role: 'assistant', parts: assistantParts },
|
||||
{
|
||||
id: 'u2',
|
||||
role: 'user',
|
||||
parts: [{ type: 'text', text: 'now revert them' }],
|
||||
},
|
||||
] as unknown as UIMessage[];
|
||||
|
||||
const unresolvedToolCallIds = async (
|
||||
messages: UIMessage[],
|
||||
): Promise<string[]> => {
|
||||
const modelMessages = await convertToModelMessages(messages);
|
||||
const pending = new Set<string>();
|
||||
|
||||
for (const message of modelMessages) {
|
||||
if (!Array.isArray(message.content)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const content of message.content) {
|
||||
if (typeof content !== 'object') {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (content.type === 'tool-call' && content.providerExecuted !== true) {
|
||||
pending.add(content.toolCallId);
|
||||
}
|
||||
|
||||
if (content.type === 'tool-result') {
|
||||
pending.delete(content.toolCallId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [...pending];
|
||||
};
|
||||
|
||||
describe('finalizeDanglingToolParts round-trip', () => {
|
||||
const interruptedBatch: ExtendedUIMessagePart[] = [
|
||||
{ type: 'text', text: 'Creating items…' } as ExtendedUIMessagePart,
|
||||
toolPart('output-available', {
|
||||
toolCallId: 'done_1',
|
||||
output: { success: true },
|
||||
}),
|
||||
toolPart('input-available', { toolCallId: 'pending_1' }),
|
||||
toolPart('input-available', { toolCallId: 'pending_2' }),
|
||||
toolPart('input-streaming', { toolCallId: 'streaming_1' }),
|
||||
];
|
||||
|
||||
it('reproduces the bug: input-available calls are left unresolved (input-streaming is not)', async () => {
|
||||
expect(await unresolvedToolCallIds(buildThread(interruptedBatch))).toEqual([
|
||||
'pending_1',
|
||||
'pending_2',
|
||||
]);
|
||||
});
|
||||
|
||||
it('resolves every tool call after finalize + persistence round-trip', async () => {
|
||||
const reloaded = persistAndReload(
|
||||
finalizeDanglingToolParts(interruptedBatch),
|
||||
);
|
||||
|
||||
expect(await unresolvedToolCallIds(buildThread(reloaded))).toEqual([]);
|
||||
});
|
||||
|
||||
it('drops the input-streaming part through the round-trip', async () => {
|
||||
const reloaded = persistAndReload(
|
||||
finalizeDanglingToolParts(interruptedBatch),
|
||||
);
|
||||
|
||||
expect(
|
||||
reloaded.some(
|
||||
(part) =>
|
||||
(part as { toolCallId?: string }).toolCallId === 'streaming_1',
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('preserves the completed tool result through the round-trip', async () => {
|
||||
const reloaded = persistAndReload(
|
||||
finalizeDanglingToolParts(interruptedBatch),
|
||||
);
|
||||
const toolResults = (await convertToModelMessages(buildThread(reloaded)))
|
||||
.filter((message) => message.role === 'tool')
|
||||
.flatMap((message) =>
|
||||
Array.isArray(message.content) ? message.content : [],
|
||||
);
|
||||
|
||||
expect(toolResults).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
toolCallId: 'done_1',
|
||||
output: expect.objectContaining({ value: { success: true } }),
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
+88
@@ -0,0 +1,88 @@
|
||||
import { type ExtendedUIMessagePart } from 'twenty-shared/ai';
|
||||
|
||||
import { finalizeDanglingToolParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/finalize-dangling-tool-parts.util';
|
||||
|
||||
const buildToolPart = (
|
||||
state: string,
|
||||
overrides: Record<string, unknown> = {},
|
||||
): ExtendedUIMessagePart =>
|
||||
({
|
||||
type: 'tool-execute_tool',
|
||||
toolCallId: 'call_1',
|
||||
input: { foo: 'bar' },
|
||||
state,
|
||||
...overrides,
|
||||
}) as ExtendedUIMessagePart;
|
||||
|
||||
describe('finalizeDanglingToolParts', () => {
|
||||
it('returns an empty array unchanged', () => {
|
||||
expect(finalizeDanglingToolParts([])).toEqual([]);
|
||||
});
|
||||
|
||||
it('rewrites an input-available tool part to an interrupted output-error', () => {
|
||||
const result = finalizeDanglingToolParts([
|
||||
buildToolPart('input-available'),
|
||||
]);
|
||||
|
||||
expect(result[0]).toEqual({
|
||||
type: 'tool-execute_tool',
|
||||
toolCallId: 'call_1',
|
||||
input: { foo: 'bar' },
|
||||
state: 'output-error',
|
||||
errorText: 'Tool execution was interrupted.',
|
||||
});
|
||||
});
|
||||
|
||||
it('leaves a completed tool part untouched', () => {
|
||||
const part = buildToolPart('output-available', { output: { ok: true } });
|
||||
|
||||
expect(finalizeDanglingToolParts([part])).toEqual([part]);
|
||||
});
|
||||
|
||||
it('leaves an errored tool part untouched', () => {
|
||||
const part = buildToolPart('output-error', { errorText: 'boom' });
|
||||
|
||||
expect(finalizeDanglingToolParts([part])).toEqual([part]);
|
||||
});
|
||||
|
||||
it('drops an input-streaming tool part with incomplete arguments', () => {
|
||||
const part = buildToolPart('input-streaming');
|
||||
|
||||
expect(finalizeDanglingToolParts([part])).toEqual([]);
|
||||
});
|
||||
|
||||
it('keeps surrounding parts when dropping an input-streaming part', () => {
|
||||
const text = { type: 'text', text: 'hello' } as ExtendedUIMessagePart;
|
||||
const streaming = buildToolPart('input-streaming');
|
||||
|
||||
expect(finalizeDanglingToolParts([text, streaming])).toEqual([text]);
|
||||
});
|
||||
|
||||
it('leaves non-tool parts untouched', () => {
|
||||
const parts = [
|
||||
{ type: 'text', text: 'hello' },
|
||||
{ type: 'reasoning', text: 'thinking' },
|
||||
] as ExtendedUIMessagePart[];
|
||||
|
||||
expect(finalizeDanglingToolParts(parts)).toEqual(parts);
|
||||
});
|
||||
|
||||
it('finalizes only the dangling parts in a mixed batch', () => {
|
||||
const completed = buildToolPart('output-available', {
|
||||
toolCallId: 'call_done',
|
||||
output: { ok: true },
|
||||
});
|
||||
const dangling = buildToolPart('input-available', {
|
||||
toolCallId: 'call_pending',
|
||||
});
|
||||
|
||||
const result = finalizeDanglingToolParts([completed, dangling]);
|
||||
|
||||
expect(result[0]).toEqual(completed);
|
||||
expect(result[1]).toMatchObject({
|
||||
toolCallId: 'call_pending',
|
||||
state: 'output-error',
|
||||
errorText: 'Tool execution was interrupted.',
|
||||
});
|
||||
});
|
||||
});
|
||||
+19
@@ -0,0 +1,19 @@
|
||||
import { isToolUIPart } from 'ai';
|
||||
import { type ExtendedUIMessagePart } from 'twenty-shared/ai';
|
||||
|
||||
const INTERRUPTED_TOOL_ERROR_TEXT = 'Tool execution was interrupted.';
|
||||
|
||||
export const finalizeDanglingToolParts = (
|
||||
parts: ExtendedUIMessagePart[],
|
||||
): ExtendedUIMessagePart[] =>
|
||||
parts
|
||||
.filter((part) => !(isToolUIPart(part) && part.state === 'input-streaming'))
|
||||
.map((part) =>
|
||||
isToolUIPart(part) && part.state === 'input-available'
|
||||
? ({
|
||||
...part,
|
||||
state: 'output-error',
|
||||
errorText: INTERRUPTED_TOOL_ERROR_TEXT,
|
||||
} as ExtendedUIMessagePart)
|
||||
: part,
|
||||
);
|
||||
+8
-5
@@ -14,6 +14,7 @@ import {
|
||||
AgentMessageStatus,
|
||||
} from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
|
||||
import { AgentTurnEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-turn.entity';
|
||||
import { finalizeDanglingToolParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/finalize-dangling-tool-parts.util';
|
||||
import { mapUIMessagePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts';
|
||||
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
|
||||
import {
|
||||
@@ -232,15 +233,17 @@ export class AgentChatService {
|
||||
|
||||
if (uiMessage.parts && uiMessage.parts.length > 0) {
|
||||
const dbParts = mapUIMessagePartsToDBParts(
|
||||
uiMessage.parts,
|
||||
finalizeDanglingToolParts(uiMessage.parts),
|
||||
savedMessageId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messagePartRepository.insert(
|
||||
workspaceId,
|
||||
dbParts as QueryDeepPartialEntity<AgentMessagePartEntity>[],
|
||||
);
|
||||
if (dbParts.length > 0) {
|
||||
await this.messagePartRepository.insert(
|
||||
workspaceId,
|
||||
dbParts as QueryDeepPartialEntity<AgentMessagePartEntity>[],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user