Compare commits

...

10 Commits

Author SHA1 Message Date
Félix Malfait f5b2c71943 Fix 2026-03-31 18:28:16 +02:00
Félix Malfait 74e6e20921 Merge branch 'feat/agent-chat-message-queue' of github.com:twentyhq/twenty into pr/19118 2026-03-31 17:57:07 +02:00
Félix Malfait c3ee312d26 Fix client SDK generated types: make turnId nullable
The DTO exposes turnId as nullable for queued messages, update
generated schema to match.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 17:45:52 +02:00
Félix Malfait 6cdca784fd Merge branch 'main' into feat/agent-chat-message-queue 2026-03-31 17:31:36 +02:00
Félix Malfait 9573e11879 Fix front typecheck and update client SDK generated types
- Cast status to union type in mapDBMessagesToUIMessages
- Update client SDK generated schema/types to include status field

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 09:57:51 +02:00
Félix Malfait 717989d36d Fix circular dependency and improve code quality
- Extract STREAM_AGENT_CHAT_JOB_NAME and StreamAgentChatJobData to
  separate types file to break circular dependency between
  stream-agent-chat.job.ts and agent-chat-streaming.service.ts
- Optimize flushNextQueuedMessage to check queued messages first
  (lightweight query) before loading full conversation
- Add missing state field to reasoning parts in server-side mapper
- Use entity status value in controller response instead of string literal
- Narrow ExtendedUIMessage.status to 'queued' | 'sent' union type

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 09:45:09 +02:00
Félix Malfait e6cfa46998 fix: typecheck errors for status field and reasoning part
- Add optional status field to ExtendedUIMessage in twenty-shared
- Remove invalid providerOptions from reasoning part mapper

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 09:34:03 +02:00
Félix Malfait d4acf77338 fix: migration schema prefix, review fixes
- Add "core" schema prefix to migration SQL (fixes CI)
- Remove unused modelId param from queueMessage
- Scope promoteQueuedMessage to QUEUED status only
- Use AgentMessageStatus enum instead of string literals
- Create proper mapDBPartsToUIMessageParts server-side mapper
  (fixes lossy mapping that dropped tool calls, reasoning, etc.)
- Merge getQueuedMessages + getMessagesForThread into single query
- Pass hasTitle from job data to avoid redundant thread lookup
- Remove modelId from frontend queue POST body

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 09:28:18 +02:00
Félix Malfait 220b32b19b feat: server-side message queue for agent chat
Replace localStorage-based frontend queue with server-side persistence.
When a user sends a message while streaming, the message is stored in
the database with status='queued'. When streaming finishes, the worker
automatically picks up the next queued message and starts processing it.

- Add `status` column (queued/sent) and make `turnId` nullable on AgentMessage
- Add REST endpoints POST/:threadId/queue and DELETE/:threadId/queue/:messageId
- Add auto-flush logic in stream job's finally block
- Frontend reads queued messages from GraphQL instead of localStorage
- Delete queued messages via REST DELETE endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 22:40:50 +02:00
Félix Malfait f8d92f9626 feat: queue messages sent while AI is streaming
When the user sends a message while the AI is still streaming a
response, instead of silently dropping it, queue it per-thread.
The queue is persisted to localStorage so it survives page refreshes.
When streaming completes, the next queued message is automatically
sent.

- New Jotai state: agentChatQueuedMessagesByThreadIdState (localStorage-backed)
- New UI component: AIChatQueuedMessages showing queued items with remove buttons
- Modified handleSendMessage to push to queue when streaming is active
- Added flushNextQueuedMessage to auto-send after stream completion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 22:28:16 +02:00
23 changed files with 613 additions and 50 deletions
+23
View File
@@ -0,0 +1,23 @@
{
"version": "0.0.1",
"configurations": [
{
"name": "twenty-front",
"runtimeExecutable": "npx",
"runtimeArgs": ["nx", "start", "twenty-front"],
"port": 3001
},
{
"name": "twenty-server",
"runtimeExecutable": "npx",
"runtimeArgs": ["nx", "start", "twenty-server"],
"port": 3000
},
{
"name": "twenty-worker",
"runtimeExecutable": "npx",
"runtimeArgs": ["nx", "run", "twenty-server:worker"],
"port": 0
}
]
}
@@ -2787,10 +2787,12 @@ type AgentChatThread {
type AgentMessage {
id: UUID!
threadId: UUID!
turnId: UUID!
turnId: UUID
agentId: UUID
role: String!
status: String!
parts: [AgentMessagePart!]!
processedAt: DateTime
createdAt: DateTime!
}
@@ -2448,10 +2448,12 @@ export interface AgentChatThread {
export interface AgentMessage {
id: Scalars['UUID']
threadId: Scalars['UUID']
turnId: Scalars['UUID']
turnId?: Scalars['UUID']
agentId?: Scalars['UUID']
role: Scalars['String']
status: Scalars['String']
parts: AgentMessagePart[]
processedAt?: Scalars['DateTime']
createdAt: Scalars['DateTime']
__typename: 'AgentMessage'
}
@@ -5598,7 +5600,9 @@ export interface AgentMessageGenqlSelection{
turnId?: boolean | number
agentId?: boolean | number
role?: boolean | number
status?: boolean | number
parts?: AgentMessagePartGenqlSelection
processedAt?: boolean | number
createdAt?: boolean | number
__typename?: boolean | number
__scalar?: boolean | number
@@ -5543,9 +5543,15 @@ export default {
"role": [
1
],
"status": [
1
],
"parts": [
296
],
"processedAt": [
4
],
"createdAt": [
4
],
@@ -171,9 +171,11 @@ export type AgentMessage = {
createdAt: Scalars['DateTime'];
id: Scalars['UUID'];
parts: Array<AgentMessagePart>;
processedAt?: Maybe<Scalars['DateTime']>;
role: Scalars['String'];
status: Scalars['String'];
threadId: Scalars['UUID'];
turnId: Scalars['UUID'];
turnId?: Maybe<Scalars['UUID']>;
};
export type AgentMessagePart = {
@@ -6281,7 +6283,7 @@ export type GetChatMessagesQueryVariables = Exact<{
}>;
export type GetChatMessagesQuery = { __typename?: 'Query', chatMessages: Array<{ __typename?: 'AgentMessage', id: string, threadId: string, turnId: string, role: string, createdAt: string, parts: Array<{ __typename?: 'AgentMessagePart', id: string, messageId: string, orderIndex: number, type: string, textContent?: string | null, reasoningContent?: string | null, toolName?: string | null, toolCallId?: string | null, toolInput?: any | null, toolOutput?: any | null, state?: string | null, errorMessage?: string | null, errorDetails?: any | null, sourceUrlSourceId?: string | null, sourceUrlUrl?: string | null, sourceUrlTitle?: string | null, sourceDocumentSourceId?: string | null, sourceDocumentMediaType?: string | null, sourceDocumentTitle?: string | null, sourceDocumentFilename?: string | null, fileMediaType?: string | null, fileFilename?: string | null, fileUrl?: string | null, fileId?: string | null, providerMetadata?: any | null, createdAt: string }> }> };
export type GetChatMessagesQuery = { __typename?: 'Query', chatMessages: Array<{ __typename?: 'AgentMessage', id: string, threadId: string, turnId?: string | null, role: string, status: string, createdAt: string, parts: Array<{ __typename?: 'AgentMessagePart', id: string, messageId: string, orderIndex: number, type: string, textContent?: string | null, reasoningContent?: string | null, toolName?: string | null, toolCallId?: string | null, toolInput?: any | null, toolOutput?: any | null, state?: string | null, errorMessage?: string | null, errorDetails?: any | null, sourceUrlSourceId?: string | null, sourceUrlUrl?: string | null, sourceUrlTitle?: string | null, sourceDocumentSourceId?: string | null, sourceDocumentMediaType?: string | null, sourceDocumentTitle?: string | null, sourceDocumentFilename?: string | null, fileMediaType?: string | null, fileFilename?: string | null, fileUrl?: string | null, fileId?: string | null, providerMetadata?: any | null, createdAt: string }> }> };
export type GetChatThreadsQueryVariables = Exact<{
paging?: InputMaybe<CursorPaging>;
@@ -8119,7 +8121,7 @@ export const FindManySkillsDocument = {"kind":"Document","definitions":[{"kind":
export const FindOneAgentDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"FindOneAgent"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"id"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"UUID"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"findOneAgent"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"ObjectValue","fields":[{"kind":"ObjectField","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"id"}}}]}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"AgentFields"}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"AgentFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"Agent"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}},{"kind":"Field","name":{"kind":"Name","value":"label"}},{"kind":"Field","name":{"kind":"Name","value":"description"}},{"kind":"Field","name":{"kind":"Name","value":"icon"}},{"kind":"Field","name":{"kind":"Name","value":"prompt"}},{"kind":"Field","name":{"kind":"Name","value":"modelId"}},{"kind":"Field","name":{"kind":"Name","value":"responseFormat"}},{"kind":"Field","name":{"kind":"Name","value":"roleId"}},{"kind":"Field","name":{"kind":"Name","value":"isCustom"}},{"kind":"Field","name":{"kind":"Name","value":"modelConfiguration"}},{"kind":"Field","name":{"kind":"Name","value":"evaluationInputs"}},{"kind":"Field","name":{"kind":"Name","value":"applicationId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}}]} as unknown as DocumentNode<FindOneAgentQuery, FindOneAgentQueryVariables>;
export const FindOneSkillDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"FindOneSkill"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"id"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"UUID"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"skill"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"id"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"SkillFields"}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"SkillFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"Skill"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}},{"kind":"Field","name":{"kind":"Name","value":"label"}},{"kind":"Field","name":{"kind":"Name","value":"description"}},{"kind":"Field","name":{"kind":"Name","value":"icon"}},{"kind":"Field","name":{"kind":"Name","value":"content"}},{"kind":"Field","name":{"kind":"Name","value":"isCustom"}},{"kind":"Field","name":{"kind":"Name","value":"isActive"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}}]} as unknown as DocumentNode<FindOneSkillQuery, FindOneSkillQueryVariables>;
export const GetAgentTurnsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetAgentTurns"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"agentId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"UUID"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"agentTurns"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"agentId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"agentId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"threadId"}},{"kind":"Field","name":{"kind":"Name","value":"agentId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"evaluations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"score"}},{"kind":"Field","name":{"kind":"Name","value":"comment"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}},{"kind":"Field","name":{"kind":"Name","value":"messages"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"role"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"parts"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"type"}},{"kind":"Field","name":{"kind":"Name","value":"textContent"}},{"kind":"Field","name":{"kind":"Name","value":"reasoningContent"}},{"kind":"Field","name":{"kind":"Name","value":"toolName"}},{"kind":"Field","name":{"kind":"Name","value":"toolCallId"}},{"kind":"Field","name":{"kind":"Name","value":"toolInput"}},{"kind":"Field","name":{"kind":"Name","value":"toolOutput"}},{"kind":"Field","name":{"kind":"Name","value":"errorMessage"}},{"kind":"Field","name":{"kind":"Name","value":"state"}},{"kind":"Field","name":{"kind":"Name","value":"errorDetails"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlUrl"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"fileFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileUrl"}},{"kind":"Field","name":{"kind":"Name","value":"providerMetadata"}}]}}]}}]}}]}}]} as unknown as DocumentNode<GetAgentTurnsQuery, GetAgentTurnsQueryVariables>;
export const GetChatMessagesDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetChatMessages"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"threadId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"UUID"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"chatMessages"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"threadId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"threadId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"threadId"}},{"kind":"Field","name":{"kind":"Name","value":"turnId"}},{"kind":"Field","name":{"kind":"Name","value":"role"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"parts"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"messageId"}},{"kind":"Field","name":{"kind":"Name","value":"orderIndex"}},{"kind":"Field","name":{"kind":"Name","value":"type"}},{"kind":"Field","name":{"kind":"Name","value":"textContent"}},{"kind":"Field","name":{"kind":"Name","value":"reasoningContent"}},{"kind":"Field","name":{"kind":"Name","value":"toolName"}},{"kind":"Field","name":{"kind":"Name","value":"toolCallId"}},{"kind":"Field","name":{"kind":"Name","value":"toolInput"}},{"kind":"Field","name":{"kind":"Name","value":"toolOutput"}},{"kind":"Field","name":{"kind":"Name","value":"state"}},{"kind":"Field","name":{"kind":"Name","value":"errorMessage"}},{"kind":"Field","name":{"kind":"Name","value":"errorDetails"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlUrl"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"fileFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileUrl"}},{"kind":"Field","name":{"kind":"Name","value":"fileId"}},{"kind":"Field","name":{"kind":"Name","value":"providerMetadata"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]}}]}}]} as unknown as DocumentNode<GetChatMessagesQuery, GetChatMessagesQueryVariables>;
export const GetChatMessagesDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetChatMessages"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"threadId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"UUID"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"chatMessages"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"threadId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"threadId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"threadId"}},{"kind":"Field","name":{"kind":"Name","value":"turnId"}},{"kind":"Field","name":{"kind":"Name","value":"role"}},{"kind":"Field","name":{"kind":"Name","value":"status"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"parts"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"messageId"}},{"kind":"Field","name":{"kind":"Name","value":"orderIndex"}},{"kind":"Field","name":{"kind":"Name","value":"type"}},{"kind":"Field","name":{"kind":"Name","value":"textContent"}},{"kind":"Field","name":{"kind":"Name","value":"reasoningContent"}},{"kind":"Field","name":{"kind":"Name","value":"toolName"}},{"kind":"Field","name":{"kind":"Name","value":"toolCallId"}},{"kind":"Field","name":{"kind":"Name","value":"toolInput"}},{"kind":"Field","name":{"kind":"Name","value":"toolOutput"}},{"kind":"Field","name":{"kind":"Name","value":"state"}},{"kind":"Field","name":{"kind":"Name","value":"errorMessage"}},{"kind":"Field","name":{"kind":"Name","value":"errorDetails"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlUrl"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrlTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentSourceId"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentTitle"}},{"kind":"Field","name":{"kind":"Name","value":"sourceDocumentFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileMediaType"}},{"kind":"Field","name":{"kind":"Name","value":"fileFilename"}},{"kind":"Field","name":{"kind":"Name","value":"fileUrl"}},{"kind":"Field","name":{"kind":"Name","value":"fileId"}},{"kind":"Field","name":{"kind":"Name","value":"providerMetadata"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]}}]}}]} as unknown as DocumentNode<GetChatMessagesQuery, GetChatMessagesQueryVariables>;
export const GetChatThreadsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetChatThreads"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"paging"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"CursorPaging"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"chatThreads"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"paging"},"value":{"kind":"Variable","name":{"kind":"Name","value":"paging"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"edges"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"node"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"title"}},{"kind":"Field","name":{"kind":"Name","value":"totalInputTokens"}},{"kind":"Field","name":{"kind":"Name","value":"totalOutputTokens"}},{"kind":"Field","name":{"kind":"Name","value":"contextWindowTokens"}},{"kind":"Field","name":{"kind":"Name","value":"conversationSize"}},{"kind":"Field","name":{"kind":"Name","value":"totalInputCredits"}},{"kind":"Field","name":{"kind":"Name","value":"totalOutputCredits"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}},{"kind":"Field","name":{"kind":"Name","value":"cursor"}}]}},{"kind":"Field","name":{"kind":"Name","value":"pageInfo"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"endCursor"}},{"kind":"Field","name":{"kind":"Name","value":"hasNextPage"}}]}}]}}]}}]} as unknown as DocumentNode<GetChatThreadsQuery, GetChatThreadsQueryVariables>;
export const GetToolIndexDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetToolIndex"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"getToolIndex"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"name"}},{"kind":"Field","name":{"kind":"Name","value":"description"}},{"kind":"Field","name":{"kind":"Name","value":"category"}},{"kind":"Field","name":{"kind":"Name","value":"objectName"}}]}}]}}]} as unknown as DocumentNode<GetToolIndexQuery, GetToolIndexQueryVariables>;
export const GetToolInputSchemaDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetToolInputSchema"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"toolName"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"getToolInputSchema"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"toolName"},"value":{"kind":"Variable","name":{"kind":"Name","value":"toolName"}}}]}]}}]} as unknown as DocumentNode<GetToolInputSchemaQuery, GetToolInputSchemaQueryVariables>;
@@ -189,23 +189,22 @@ export const AIChatMessage = ({
<AIChatErrorRenderer error={error} />
)}
</StyledMessageContainer>
{agentChatMessage.parts.length > 0 &&
agentChatMessage.metadata?.createdAt && (
<StyledMessageFooter className="message-footer">
<StyledMessageTimestamp>
{beautifyPastDateRelativeToNow(
agentChatMessage.metadata?.createdAt,
localeCatalog,
)}
</StyledMessageTimestamp>
<LightCopyIconButton
copyText={
agentChatMessage.parts.find((part) => part.type === 'text')
?.text ?? ''
}
/>
</StyledMessageFooter>
)}
{agentChatMessage.parts.length > 0 && (
<StyledMessageFooter className="message-footer">
<StyledMessageTimestamp>
{beautifyPastDateRelativeToNow(
agentChatMessage.metadata?.createdAt ?? new Date(),
localeCatalog,
)}
</StyledMessageTimestamp>
<LightCopyIconButton
copyText={
agentChatMessage.parts.find((part) => part.type === 'text')
?.text ?? ''
}
/>
</StyledMessageFooter>
)}
</StyledMessageBubble>
);
};
@@ -0,0 +1,100 @@
import { styled } from '@linaria/react';
import { AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME } from '@/ai/constants/AgentChatRefetchMessagesEventName';
import { agentChatQueuedMessagesComponentFamilyState } from '@/ai/states/agentChatQueuedMessagesComponentFamilyState';
import { currentAIChatThreadState } from '@/ai/states/currentAIChatThreadState';
import { REST_API_BASE_URL } from '@/apollo/constant/rest-api-base-url';
import { getTokenPair } from '@/apollo/utils/getTokenPair';
import { dispatchBrowserEvent } from '@/browser-event/utils/dispatchBrowserEvent';
import { useAtomComponentFamilyStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomComponentFamilyStateValue';
import { useAtomStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomStateValue';
import { isDefined } from 'twenty-shared/utils';
import { IconX } from 'twenty-ui/display';
import { LightIconButton } from 'twenty-ui/input';
import { themeCssVariables } from 'twenty-ui/theme-constants';
const StyledQueueContainer = styled.div`
display: flex;
flex-direction: column;
gap: ${themeCssVariables.spacing[1]};
padding: 0 ${themeCssVariables.spacing[3]};
`;
const StyledQueueLabel = styled.div`
color: ${themeCssVariables.font.color.light};
font-size: ${themeCssVariables.font.size.xs};
padding-left: ${themeCssVariables.spacing[1]};
`;
const StyledQueuedItem = styled.div`
align-items: center;
background: ${themeCssVariables.background.tertiary};
border-radius: ${themeCssVariables.border.radius.sm};
color: ${themeCssVariables.font.color.secondary};
display: flex;
font-size: ${themeCssVariables.font.size.md};
gap: ${themeCssVariables.spacing[2]};
justify-content: space-between;
padding: ${themeCssVariables.spacing[1]} ${themeCssVariables.spacing[2]};
`;
const StyledQueuedText = styled.span`
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
`;
export const AIChatQueuedMessages = () => {
const currentAIChatThread = useAtomStateValue(currentAIChatThreadState);
const queuedMessages = useAtomComponentFamilyStateValue(
agentChatQueuedMessagesComponentFamilyState,
{ threadId: currentAIChatThread },
);
if (!isDefined(currentAIChatThread) || queuedMessages.length === 0) {
return null;
}
const handleRemove = (messageId: string) => {
const tokenPair = getTokenPair();
if (!isDefined(tokenPair)) {
return;
}
fetch(
`${REST_API_BASE_URL}/agent-chat/${currentAIChatThread}/queue/${messageId}`,
{
method: 'DELETE',
headers: {
Authorization: `Bearer ${tokenPair.accessOrWorkspaceAgnosticToken.token}`,
},
},
)
.then(() => {
dispatchBrowserEvent(AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME);
})
.catch(() => {});
};
return (
<StyledQueueContainer>
<StyledQueueLabel>{queuedMessages.length} Queued</StyledQueueLabel>
{queuedMessages.map((message) => {
const textPart = message.parts?.find((part) => part.type === 'text');
const displayText = textPart && 'text' in textPart ? textPart.text : '';
return (
<StyledQueuedItem key={message.id}>
<StyledQueuedText>{displayText}</StyledQueuedText>
<LightIconButton
Icon={IconX}
onClick={() => handleRemove(message.id)}
size="small"
/>
</StyledQueuedItem>
);
})}
</StyledQueueContainer>
);
};
@@ -10,6 +10,7 @@ import { currentAIChatThreadState } from '@/ai/states/currentAIChatThreadState';
import { threadIdCreatedFromDraftState } from '@/ai/states/threadIdCreatedFromDraftState';
import { useAtomStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomStateValue';
import { AIChatQueuedMessages } from '@/ai/components/AIChatQueuedMessages';
import { AIChatTabMessageList } from '@/ai/components/AIChatTabMessageList';
const StyledContainer = styled.div<{ isDraggingFile: boolean }>`
@@ -51,6 +52,7 @@ export const AIChatTab = () => {
{!isDraggingFile && (
<>
<AIChatTabMessageList />
<AIChatQueuedMessages />
<AIChatEditorSection key={editorSectionKey} />
</>
)}
@@ -6,6 +6,7 @@ import { AGENT_CHAT_UNKNOWN_THREAD_ID } from '@/ai/constants/AgentChatUnknownThr
import { AGENT_CHAT_NEW_THREAD_DRAFT_KEY } from '@/ai/states/agentChatDraftsByThreadIdState';
import { agentChatFetchedMessagesComponentFamilyState } from '@/ai/states/agentChatFetchedMessagesComponentFamilyState';
import { agentChatMessagesLoadingState } from '@/ai/states/agentChatMessagesLoadingState';
import { agentChatQueuedMessagesComponentFamilyState } from '@/ai/states/agentChatQueuedMessagesComponentFamilyState';
import { currentAIChatThreadState } from '@/ai/states/currentAIChatThreadState';
import { skipMessagesSkeletonUntilLoadedState } from '@/ai/states/skipMessagesSkeletonUntilLoadedState';
import { mapDBMessagesToUIMessages } from '@/ai/utils/mapDBMessagesToUIMessages';
@@ -42,6 +43,11 @@ export const AgentChatMessagesFetchEffect = () => {
{ threadId: currentAIChatThread },
);
const setAgentChatQueuedMessages = useSetAtomComponentFamilyState(
agentChatQueuedMessagesComponentFamilyState,
{ threadId: currentAIChatThread },
);
const handleFirstLoad = useCallback(
(_data: GetChatMessagesQuery) => {
setSkipMessagesSkeletonUntilLoaded(false);
@@ -52,9 +58,14 @@ export const AgentChatMessagesFetchEffect = () => {
const handleDataLoaded = useCallback(
(data: GetChatMessagesQuery) => {
const uiMessages = mapDBMessagesToUIMessages(data.chatMessages ?? []);
setAgentChatFetchedMessages(uiMessages);
setAgentChatFetchedMessages(
uiMessages.filter((message) => message.status !== 'queued'),
);
setAgentChatQueuedMessages(
uiMessages.filter((message) => message.status === 'queued'),
);
},
[setAgentChatFetchedMessages],
[setAgentChatFetchedMessages, setAgentChatQueuedMessages],
);
const handleLoadingChange = useCallback(
@@ -7,6 +7,7 @@ export const GET_CHAT_MESSAGES = gql`
threadId
turnId
role
status
createdAt
parts {
id
@@ -15,12 +15,14 @@ import {
agentChatDraftsByThreadIdState,
} from '@/ai/states/agentChatDraftsByThreadIdState';
import { agentChatInputState } from '@/ai/states/agentChatInputState';
import { AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME } from '@/ai/constants/AgentChatRefetchMessagesEventName';
import { useAgentChatModelId } from '@/ai/hooks/useAgentChatModelId';
import { REST_API_BASE_URL } from '@/apollo/constant/rest-api-base-url';
import { getTokenPair } from '@/apollo/utils/getTokenPair';
import { renewToken } from '@/auth/services/AuthService';
import { tokenPairState } from '@/auth/states/tokenPairState';
import { useListenToBrowserEvent } from '@/browser-event/hooks/useListenToBrowserEvent';
import { dispatchBrowserEvent } from '@/browser-event/utils/dispatchBrowserEvent';
import { useAtomState } from '@/ui/utilities/state/jotai/hooks/useAtomState';
import { useAtomStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomStateValue';
import { useSetAtomState } from '@/ui/utilities/state/jotai/hooks/useSetAtomState';
@@ -239,7 +241,8 @@ export const useAgentChat = (
});
const isStreaming = status === 'streaming';
const isLoading = isStreaming || agentChatSelectedFiles.length > 0;
const isBusy = isStreaming || status === 'submitted';
const isLoading = isBusy || agentChatSelectedFiles.length > 0;
const handleSendMessage = useCallback(async () => {
const draftKey =
@@ -254,7 +257,50 @@ export const useAgentChat = (
).trim()
: store.get(agentChatInputState.atom).trim();
if (contentToSend === '' || isLoading) {
if (contentToSend === '') {
return;
}
if (isBusy) {
const threadId = store.get(currentAIChatThreadState.atom);
if (!isDefined(threadId) || !isValidUuid(threadId)) {
return;
}
const tokenPair = getTokenPair();
if (!isDefined(tokenPair)) {
return;
}
// Queue message on the server
fetch(`${REST_API_BASE_URL}/agent-chat/${threadId}/queue`, {
method: 'POST',
headers: {
Authorization: `Bearer ${tokenPair.accessOrWorkspaceAgnosticToken.token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
text: contentToSend,
}),
})
.then(() => {
// Refetch messages to show the queued message in the UI
dispatchBrowserEvent(AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME);
})
.catch(() => {});
setAgentChatInput('');
setAgentChatDraftsByThreadId((prev) => ({
...prev,
[draftKey]: '',
}));
return;
}
if (isLoading) {
return;
}
@@ -295,6 +341,7 @@ export const useAgentChat = (
}, [
store,
isLoading,
isBusy,
ensureThreadIdForSend,
setAgentChatInput,
getBrowsingContext,
@@ -0,0 +1,10 @@
import { AgentChatComponentInstanceContext } from '@/ai/states/AgentChatComponentInstanceContext';
import { createAtomComponentFamilyState } from '@/ui/utilities/state/jotai/utils/createAtomComponentFamilyState';
import { type ExtendedUIMessage } from 'twenty-shared/ai';
export const agentChatQueuedMessagesComponentFamilyState =
createAtomComponentFamilyState<ExtendedUIMessage[], { threadId: string }>({
key: 'agentChatQueuedMessagesComponentFamilyState',
defaultValue: [],
componentInstanceContext: AgentChatComponentInstanceContext,
});
@@ -8,6 +8,7 @@ export const mapDBMessagesToUIMessages = (
return dbMessages.map((dbMessage) => ({
id: dbMessage.id,
role: dbMessage.role as ExtendedUIMessage['role'],
status: dbMessage.status as 'queued' | 'sent',
parts: dbMessage.parts.map(mapDBPartToUIMessagePart),
metadata: {
createdAt: dbMessage.createdAt,
@@ -0,0 +1,39 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddStatusToAgentMessage1774776000000
implements MigrationInterface
{
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TYPE "core"."agentMessage_status_enum" AS ENUM ('queued', 'sent')`,
);
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" ADD COLUMN "status" "core"."agentMessage_status_enum" NOT NULL DEFAULT 'sent'`,
);
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" ALTER COLUMN "turnId" DROP NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" ADD COLUMN "processedAt" TIMESTAMPTZ`,
);
await queryRunner.query(
`UPDATE "core"."agentMessage" SET "processedAt" = "createdAt" WHERE "status" = 'sent'`,
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" DROP COLUMN "processedAt"`,
);
await queryRunner.query(
`DELETE FROM "core"."agentMessage" WHERE "turnId" IS NULL`,
);
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" ALTER COLUMN "turnId" SET NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "core"."agentMessage" DROP COLUMN "status"`,
);
await queryRunner.query(`DROP TYPE "core"."agentMessage_status_enum"`);
}
}
@@ -13,8 +13,8 @@ export class AgentMessageDTO {
@Field(() => UUIDScalarType)
threadId: string;
@Field(() => UUIDScalarType)
turnId: string;
@Field(() => UUIDScalarType, { nullable: true })
turnId: string | null;
@Field(() => UUIDScalarType, { nullable: true })
agentId: string | null;
@@ -22,9 +22,16 @@ export class AgentMessageDTO {
@Field()
role: 'system' | 'user' | 'assistant';
@Field()
status: 'queued' | 'sent';
@Field(() => [AgentMessagePartDTO])
parts: AgentMessagePartDTO[];
@IsDateString()
@Field(() => Date, { nullable: true })
processedAt: Date | null;
@IsDateString()
@Field()
createdAt: Date;
@@ -20,6 +20,11 @@ export enum AgentMessageRole {
ASSISTANT = 'assistant',
}
export enum AgentMessageStatus {
QUEUED = 'queued',
SENT = 'sent',
}
@Entity('agentMessage')
export class AgentMessageEntity {
@PrimaryGeneratedColumn('uuid')
@@ -35,15 +40,16 @@ export class AgentMessageEntity {
@JoinColumn({ name: 'threadId' })
thread: Relation<AgentChatThreadEntity>;
@Column('uuid')
@Column({ type: 'uuid', nullable: true })
@Index()
turnId: string;
turnId: string | null;
@ManyToOne(() => AgentTurnEntity, {
onDelete: 'CASCADE',
nullable: true,
})
@JoinColumn({ name: 'turnId' })
turn: Relation<AgentTurnEntity>;
turn: Relation<AgentTurnEntity> | null;
@Column({ type: 'uuid', nullable: true })
@Index()
@@ -52,9 +58,19 @@ export class AgentMessageEntity {
@Column({ type: 'enum', enum: AgentMessageRole })
role: AgentMessageRole;
@Column({
type: 'enum',
enum: AgentMessageStatus,
default: AgentMessageStatus.SENT,
})
status: AgentMessageStatus;
@OneToMany(() => AgentMessagePartEntity, (part) => part.message)
parts: Relation<AgentMessagePartEntity[]>;
@Column({ type: 'timestamptz', nullable: true })
processedAt: Date | null;
@CreateDateColumn({ type: 'timestamptz' })
createdAt: Date;
}
@@ -0,0 +1,76 @@
import { type ExtendedUIMessagePart } from 'twenty-shared/ai';
import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity';
export const mapDBPartToUIMessagePart = (
part: AgentMessagePartEntity,
): ExtendedUIMessagePart | null => {
switch (part.type) {
case 'text':
return {
type: 'text',
text: part.textContent ?? '',
};
case 'reasoning':
return {
type: 'reasoning',
text: part.reasoningContent ?? '',
state: (part.state as 'streaming' | 'done') ?? 'done',
};
case 'file':
return {
type: 'file',
mediaType: part.fileFilename?.endsWith('.png')
? 'image/png'
: 'application/octet-stream',
filename: part.fileFilename ?? '',
url: '',
};
case 'source-url':
return {
type: 'source-url',
sourceId: part.sourceUrlSourceId ?? '',
url: part.sourceUrlUrl ?? '',
title: part.sourceUrlTitle ?? '',
providerMetadata: part.providerMetadata ?? undefined,
};
case 'source-document':
return {
type: 'source-document',
sourceId: part.sourceDocumentSourceId ?? '',
mediaType: part.sourceDocumentMediaType ?? '',
title: part.sourceDocumentTitle ?? '',
filename: part.sourceDocumentFilename ?? '',
providerMetadata: part.providerMetadata ?? undefined,
};
case 'step-start':
return {
type: 'step-start',
};
case 'data-routing-status':
return null;
default: {
if (part.type.includes('tool-') && part.toolCallId) {
return {
type: part.type,
toolCallId: part.toolCallId,
input: part.toolInput ?? {},
output: part.toolOutput,
errorText: part.errorMessage ?? '',
state: part.state,
} as ExtendedUIMessagePart;
}
return null;
}
}
};
export const mapDBPartsToUIMessageParts = (
parts: AgentMessagePartEntity[],
): ExtendedUIMessagePart[] => {
return parts
.sort((a, b) => a.orderIndex - b.orderIndex)
.map(mapDBPartToUIMessagePart)
.filter((part): part is ExtendedUIMessagePart => part !== null);
};
@@ -45,6 +45,7 @@ import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/en
import { getCancelChannel } from 'src/engine/metadata-modules/ai/ai-chat/utils/get-cancel-channel.util';
import { AgentChatResumableStreamService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service';
import { AgentChatStreamingService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service';
import { AgentChatService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat.service';
import { AiModelRegistryService } from 'src/engine/metadata-modules/ai/ai-models/services/ai-model-registry.service';
@Controller('rest/agent-chat')
@@ -58,6 +59,7 @@ export class AgentChatController {
constructor(
private readonly agentStreamingService: AgentChatStreamingService,
private readonly resumableStreamService: AgentChatResumableStreamService,
private readonly agentChatService: AgentChatService,
private readonly billingService: BillingService,
private readonly twentyConfigService: TwentyConfigService,
private readonly aiModelRegistryService: AiModelRegistryService,
@@ -180,4 +182,56 @@ export class AgentChatController {
return { success: true };
}
@Post(':threadId/queue')
@UseGuards(SettingsPermissionGuard(PermissionFlagType.AI))
async queueMessage(
@Param('threadId') threadId: string,
@Body() body: { text: string },
@AuthUserWorkspaceId() userWorkspaceId: string,
) {
const thread = await this.threadRepository.findOne({
where: { id: threadId, userWorkspaceId },
});
if (!isDefined(thread)) {
throw new AgentException(
'Thread not found',
AgentExceptionCode.AGENT_EXECUTION_FAILED,
);
}
const message = await this.agentChatService.queueMessage({
threadId,
text: body.text,
});
return { id: message.id, threadId, status: message.status };
}
@Delete(':threadId/queue/:messageId')
@UseGuards(SettingsPermissionGuard(PermissionFlagType.AI))
async deleteQueuedMessage(
@Param('threadId') threadId: string,
@Param('messageId') messageId: string,
@AuthUserWorkspaceId() userWorkspaceId: string,
) {
const thread = await this.threadRepository.findOne({
where: { id: threadId, userWorkspaceId },
});
if (!isDefined(thread)) {
throw new AgentException(
'Thread not found',
AgentExceptionCode.AGENT_EXECUTION_FAILED,
);
}
const deleted = await this.agentChatService.deleteQueuedMessage(
messageId,
threadId,
);
return { success: deleted };
}
}
@@ -0,0 +1,21 @@
import type {
ExtendedUIMessage,
ExtendedUIMessagePart,
} from 'twenty-shared/ai';
import type { BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
export const STREAM_AGENT_CHAT_JOB_NAME = 'StreamAgentChatJob';
export type StreamAgentChatJobData = {
threadId: string;
streamId: string;
userWorkspaceId: string;
workspaceId: string;
messages: ExtendedUIMessage[];
browsingContext: BrowsingContextType | null;
modelId?: string;
lastUserMessageText: string;
lastUserMessageParts: ExtendedUIMessagePart[];
hasTitle: boolean;
};
@@ -16,7 +16,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { AgentMessageRole } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
import type { BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
import { AgentChatStreamingService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service';
import { computeCostBreakdown } from 'src/engine/metadata-modules/ai/ai-billing/utils/compute-cost-breakdown.util';
import { convertDollarsToBillingCredits } from 'src/engine/metadata-modules/ai/ai-billing/utils/convert-dollars-to-billing-credits.util';
import { extractCacheCreationTokens } from 'src/engine/metadata-modules/ai/ai-billing/utils/extract-cache-creation-tokens.util';
@@ -27,20 +27,12 @@ import { AgentChatService } from 'src/engine/metadata-modules/ai/ai-chat/service
import { ChatExecutionService } from 'src/engine/metadata-modules/ai/ai-chat/services/chat-execution.service';
import { getCancelChannel } from 'src/engine/metadata-modules/ai/ai-chat/utils/get-cancel-channel.util';
export const STREAM_AGENT_CHAT_JOB_NAME = 'StreamAgentChatJob';
import {
STREAM_AGENT_CHAT_JOB_NAME,
type StreamAgentChatJobData,
} from './stream-agent-chat-job.types';
export type StreamAgentChatJobData = {
threadId: string;
streamId: string;
userWorkspaceId: string;
workspaceId: string;
messages: ExtendedUIMessage[];
browsingContext: BrowsingContextType | null;
modelId?: string;
lastUserMessageText: string;
lastUserMessageParts: ExtendedUIMessagePart[];
hasTitle: boolean;
};
export { STREAM_AGENT_CHAT_JOB_NAME, type StreamAgentChatJobData };
@Processor({ queueName: MessageQueue.aiStreamQueue, scope: Scope.REQUEST })
export class StreamAgentChatJob {
@@ -55,6 +47,7 @@ export class StreamAgentChatJob {
private readonly chatExecutionService: ChatExecutionService,
private readonly resumableStreamService: AgentChatResumableStreamService,
private readonly cancelSubscriberService: AgentChatCancelSubscriberService,
private readonly agentChatStreamingService: AgentChatStreamingService,
) {}
@Process(STREAM_AGENT_CHAT_JOB_NAME)
@@ -105,6 +98,20 @@ export class StreamAgentChatJob {
})
.execute()
.catch(() => {});
// Auto-flush the next queued message for this thread
await this.agentChatStreamingService
.flushNextQueuedMessage(
data.threadId,
data.userWorkspaceId,
data.workspaceId,
data.hasTitle,
)
.catch((error) => {
this.logger.error(
`Failed to flush queued message for thread ${data.threadId}: ${error instanceof Error ? error.message : String(error)}`,
);
});
}
}
@@ -150,7 +157,7 @@ export class StreamAgentChatJob {
}: {
workspace: WorkspaceEntity;
data: StreamAgentChatJobData;
userMessagePromise: Promise<{ turnId: string }>;
userMessagePromise: Promise<{ turnId: string | null }>;
titlePromise: Promise<string | null>;
abortSignal: AbortSignal;
}): Promise<void> {
@@ -358,7 +365,7 @@ export class StreamAgentChatJob {
};
lastStepConversationSize: number;
modelConfig: AIModelConfig;
userMessagePromise: Promise<{ turnId: string }>;
userMessagePromise: Promise<{ turnId: string | null }>;
}): Promise<void> {
if (responseMessage.parts.length === 0) {
return;
@@ -369,7 +376,7 @@ export class StreamAgentChatJob {
await this.agentChatService.addMessage({
threadId,
uiMessage: responseMessage,
turnId: userMessage.turnId,
turnId: userMessage.turnId ?? undefined,
});
await this.threadRepository.update(threadId, {
@@ -16,11 +16,14 @@ import {
AgentExceptionCode,
} from 'src/engine/metadata-modules/ai/ai-agent/agent.exception';
import { type BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
import { AgentMessageStatus } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
import { mapDBPartsToUIMessageParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartsToUIMessageParts';
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
import {
STREAM_AGENT_CHAT_JOB_NAME,
type StreamAgentChatJobData,
} from 'src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job';
} from 'src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat-job.types';
import { AgentChatService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat.service';
import { AgentChatResumableStreamService } from './agent-chat-resumable-stream.service';
@@ -47,6 +50,7 @@ export class AgentChatStreamingService {
@InjectMessageQueue(MessageQueue.aiStreamQueue)
private readonly messageQueueService: MessageQueueService,
private readonly resumableStreamService: AgentChatResumableStreamService,
private readonly agentChatService: AgentChatService,
) {}
async streamAgentChat({
@@ -125,6 +129,71 @@ export class AgentChatStreamingService {
}
}
async flushNextQueuedMessage(
threadId: string,
userWorkspaceId: string,
workspaceId: string,
hasTitle: boolean,
): Promise<void> {
// Lightweight check: only query queued messages first
const queuedMessages =
await this.agentChatService.getQueuedMessages(threadId);
const nextQueued = queuedMessages[0];
if (!nextQueued) {
return;
}
const textPart = nextQueued.parts?.find((part) => part.type === 'text');
const messageText = textPart?.textContent ?? '';
if (messageText === '') {
await this.agentChatService.deleteQueuedMessage(nextQueued.id, threadId);
return;
}
await this.agentChatService.promoteQueuedMessage(nextQueued.id, threadId);
// Only load full conversation when we actually need to stream
const allMessages = await this.agentChatService.getMessagesForThread(
threadId,
userWorkspaceId,
);
// Build conversation context from sent messages (using proper mapper)
const uiMessages = allMessages
.filter((message) => message.status !== AgentMessageStatus.QUEUED)
.map((message) => ({
id: message.id,
role: message.role as 'user' | 'assistant' | 'system',
parts: mapDBPartsToUIMessageParts(message.parts ?? []),
createdAt: message.createdAt,
}));
const streamId = generateId();
await this.messageQueueService.add<StreamAgentChatJobData>(
STREAM_AGENT_CHAT_JOB_NAME,
{
threadId,
streamId,
userWorkspaceId,
workspaceId,
messages: uiMessages,
browsingContext: null,
lastUserMessageText: messageText,
lastUserMessageParts: [{ type: 'text', text: messageText }],
hasTitle,
},
);
await this.threadRepository.update(threadId, {
activeStreamId: streamId,
});
}
private async waitForResumableStream(
streamId: string,
): Promise<
@@ -10,6 +10,7 @@ import { AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-
import {
AgentMessageEntity,
AgentMessageRole,
AgentMessageStatus,
} from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
import { AgentTurnEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-turn.entity';
import { mapUIMessagePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts';
@@ -91,6 +92,7 @@ export class AgentChatService {
turnId: actualTurnId,
role: uiMessage.role as AgentMessageRole,
agentId: agentId ?? null,
processedAt: new Date(),
});
const savedMessage = await this.messageRepository.save(message);
@@ -124,11 +126,74 @@ export class AgentChatService {
return this.messageRepository.find({
where: { threadId },
order: { createdAt: 'ASC' },
order: { processedAt: { direction: 'ASC', nulls: 'LAST' } },
relations: ['parts', 'parts.file'],
});
}
async queueMessage({
threadId,
text,
}: {
threadId: string;
text: string;
}): Promise<AgentMessageEntity> {
const message = this.messageRepository.create({
threadId,
turnId: null,
role: AgentMessageRole.USER,
agentId: null,
status: AgentMessageStatus.QUEUED,
});
const savedMessage = await this.messageRepository.save(message);
const part = this.messagePartRepository.create({
messageId: savedMessage.id,
orderIndex: 0,
type: 'text',
textContent: text,
});
await this.messagePartRepository.save(part);
return savedMessage;
}
async getQueuedMessages(threadId: string): Promise<AgentMessageEntity[]> {
return this.messageRepository.find({
where: {
threadId,
status: AgentMessageStatus.QUEUED,
},
order: { createdAt: 'ASC' },
relations: ['parts'],
});
}
async deleteQueuedMessage(
messageId: string,
threadId: string,
): Promise<boolean> {
const result = await this.messageRepository.delete({
id: messageId,
threadId,
status: AgentMessageStatus.QUEUED,
});
return (result.affected ?? 0) > 0;
}
async promoteQueuedMessage(
messageId: string,
threadId: string,
): Promise<void> {
await this.messageRepository.update(
{ id: messageId, threadId, status: AgentMessageStatus.QUEUED },
{ status: AgentMessageStatus.SENT, processedAt: new Date() },
);
}
async generateTitleIfNeeded(
threadId: string,
messageContent: string,
@@ -23,4 +23,5 @@ type Metadata = {
export type ExtendedUIMessage = UIMessage<Metadata, DataMessagePart> & {
threadId?: Nullable<string>;
status?: 'queued' | 'sent';
};