Compare commits

...

2 Commits

Author SHA1 Message Date
neo773 a73020eb45 wip 2026-06-05 21:39:54 +05:30
neo773 0a4dd19472 feat(messaging): add integration testing framework for sync jobs
Open the BullMQ driver's return-value channel with a backward-compatible
TResult generic so jobs can return typed results, and add an MSW-based
integration harness that drives messaging sync jobs end-to-end against
mocked Gmail/Microsoft Graph APIs with real DB assertions.

Covers full pipeline (folder sync, list fetch, import), folder discovery
and import policy, token refresh and insufficient-permissions, rate-limit
throttling, sync cursor, failed-channel recovery, and stale-sync recovery
across the Gmail and Microsoft drivers.
2026-06-05 16:11:02 +05:30
30 changed files with 1575 additions and 57 deletions
+8 -5
View File
@@ -14,13 +14,13 @@ IS_WORKSPACE_CREATION_LIMITED_TO_SERVER_ADMINS=false
ENTERPRISE_VALIDITY_TOKEN=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkZXYtc3Vic2NyaXB0aW9uLWlkIiwic3RhdHVzIjoidmFsaWQiLCJpYXQiOjE3NzMzMDg4MzMsImV4cCI6NDg5NzUxMTIzM30.qhfrW_SV2Y86fWtWXsALlAVhxmMxylUUIefN0fki10Q2NTGGqFVXZrNn2WacJY37yq3m5y4WgwZw34ua6E0ff_YUXsrlY5OHJWHT9DMqKCRn-JujHJnnYp3VHLncy5CvxH5r9mfPFp-5AWe1pYeR1T63sTiejH3sfDrNE357SB7KVti8LCcnsJxEtXB2tRnvyvdun7A-GKoKYEIam-16ZRKKFs6GaWo8ObHdfm8yBt6uK4DZSGPWb644QyWh9FtDxbzJ0ti54DuHSlErLgIp1NNEsMA0MK7zFY7StRaOdt72rxE1ZHwN7e6HhweTU4ORVUPfYkjDFLB2fF7Pa7Kvdg
AUTH_GOOGLE_ENABLED=false
MESSAGING_PROVIDER_GMAIL_ENABLED=false
AUTH_GOOGLE_ENABLED=true
MESSAGING_PROVIDER_GMAIL_ENABLED=true
IS_IMAP_SMTP_CALDAV_ENABLED=true
IS_IMAP_SMTP_CALDAV_CONNECTION_TEST_ENABLED=false
CALENDAR_PROVIDER_GOOGLE_ENABLED=false
MESSAGING_PROVIDER_MICROSOFT_ENABLED=false
CALENDAR_PROVIDER_MICROSOFT_ENABLED=false
CALENDAR_PROVIDER_GOOGLE_ENABLED=true
MESSAGING_PROVIDER_MICROSOFT_ENABLED=true
CALENDAR_PROVIDER_MICROSOFT_ENABLED=true
AUTH_GOOGLE_CALLBACK_URL=http://localhost:3000/auth/google/redirect
AUTH_GOOGLE_APIS_CALLBACK_URL=http://localhost:3000/auth/google-apis/get-access-token
@@ -34,3 +34,6 @@ IS_WORKSPACE_CREATION_V2_ENABLED=true
SHOULD_SEED_STANDARD_RECORD_PAGE_LAYOUTS=true
LOGIC_FUNCTION_TYPE=LOCAL
IS_CONFIG_VARIABLES_IN_DB_ENABLED=false
AUTH_GOOGLE_CLIENT_ID=mock-google-client-id
AUTH_GOOGLE_CLIENT_SECRET=mock-google-client-secret
@@ -33,6 +33,9 @@ const jestConfig: JestConfigWithTsJest = {
],
testRegex: '\\.integration-spec\\.ts$',
modulePathIgnorePatterns: ['<rootDir>/dist'],
transformIgnorePatterns: [
'/node_modules/(?!(msw|@mswjs|until-async|@bundled-es-modules|@open-draft|strict-event-emitter|headers-polyfill|outvariant|is-node-process|tough-cookie|path-to-regexp|statuses|cookie|digest-fetch|md5|email-reply-parser)/)',
],
globalSetup: '<rootDir>/test/integration/utils/setup-test.ts',
globalTeardown: '<rootDir>/test/integration/utils/teardown-test.ts',
testTimeout: 20000,
@@ -92,9 +92,9 @@ export class BullMQDriver
]);
}
work<T>(
work<T, TResult = void>(
queueName: MessageQueue,
handler: (job: MessageQueueJob<T>) => Promise<void>,
handler: (job: MessageQueueJob<T>) => Promise<TResult>,
options?: MessageQueueWorkerOptions,
) {
const workerOptions = {
@@ -110,7 +110,7 @@ export class BullMQDriver
this.workerMap[queueName] = new Worker(
queueName,
async (job) =>
async (job): Promise<TResult> =>
Sentry.withIsolationScope(async () => {
applyWorkspaceSentryContextFromJobData(job.data);
@@ -124,13 +124,19 @@ export class BullMQDriver
this.logger.log(
`Processing job ${job.id} with name ${job.name} on queue ${queueName}${workspaceSuffix}`,
);
await handler({ data: job.data, id: job.id ?? '', name: job.name });
const result = await handler({
data: job.data,
id: job.id ?? '',
name: job.name,
});
const timeEnd = performance.now();
const executionTime = timeEnd - timeStart;
this.logger.log(
`Job ${job.id} with name ${job.name} processed on queue ${queueName} in ${executionTime.toFixed(2)}ms${workspaceSuffix}`,
);
return result;
}),
workerOptions,
);
@@ -217,12 +223,12 @@ export class BullMQDriver
);
}
async add<T>(
async add<T, TResult = void>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
): Promise<TResult | void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
@@ -8,15 +8,21 @@ import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueDriver {
add<T extends MessageQueueJobData>(
add<T extends MessageQueueJobData, TResult = void>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void>;
work<T extends MessageQueueJobData>(
): Promise<TResult | void>;
work<T extends MessageQueueJobData, TResult = void>(
queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
handler: ({
data,
id,
}: {
data: T;
id: string;
}) => Promise<TResult> | TResult,
options?: MessageQueueWorkerOptions,
): void;
addCron<T extends MessageQueueJobData | undefined>({
@@ -12,17 +12,21 @@ import { type MessageQueue } from 'src/engine/core-modules/message-queue/message
export class SyncDriver implements MessageQueueDriver {
private readonly logger = new Logger(SyncDriver.name);
private workersMap: {
[queueName: string]: (job: MessageQueueJob) => Promise<void> | void;
[queueName: string]: (job: MessageQueueJob) => Promise<unknown> | unknown;
} = {};
constructor() {}
async add<T extends MessageQueueJobData>(
async add<T extends MessageQueueJobData, TResult = void>(
queueName: MessageQueue,
jobName: string,
data: T,
): Promise<void> {
await this.processJob(queueName, { id: '', name: jobName, data });
): Promise<TResult | void> {
return await this.processJob<T, TResult>(queueName, {
id: '',
name: jobName,
data,
});
}
async addCron<T extends MessageQueueJobData | undefined>({
@@ -48,26 +52,30 @@ export class SyncDriver implements MessageQueueDriver {
this.logger.log(`Removing '${queueName}' cron job with SyncDriver`);
}
work<T extends MessageQueueJobData>(
work<T extends MessageQueueJobData, TResult = void>(
queueName: MessageQueue,
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
handler: (job: MessageQueueJob<T>) => Promise<TResult> | TResult,
): void {
this.logger.log(`Registering handler for queue: ${queueName}`);
this.workersMap[queueName] = handler;
this.workersMap[queueName] = handler as (
job: MessageQueueJob,
) => Promise<unknown> | unknown;
}
async processJob<T extends MessageQueueJobData>(
async processJob<T extends MessageQueueJobData, TResult = unknown>(
queueName: string,
job: MessageQueueJob<T>,
) {
): Promise<TResult | undefined> {
const worker = this.workersMap[queueName];
if (worker) {
await worker(job);
} else {
if (process.env.NODE_ENV !== 'test') {
this.logger.error(`No handler found for job: ${queueName}`);
}
return (await worker(job)) as TResult;
}
if (process.env.NODE_ENV !== 'test') {
this.logger.error(`No handler found for job: ${queueName}`);
}
return undefined;
}
}
@@ -22,6 +22,7 @@ import { type MessageQueue } from 'src/engine/core-modules/message-queue/message
import { QUEUE_WORKER_OPTIONS } from 'src/engine/core-modules/message-queue/message-queue-worker-options.constant';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import { shouldCaptureException } from 'src/engine/utils/global-exception-handler.util';
import { isDefined } from 'twenty-shared/utils';
interface ProcessorGroup {
instance: object;
@@ -138,16 +139,24 @@ export class MessageQueueExplorer implements OnModuleInit {
options?: MessageQueueWorkerOptions,
) {
queue.work(async (job) => {
let result: unknown;
for (const processorGroup of processorGroupCollection) {
await this.handleProcessor(processorGroup, job);
const groupResult = await this.handleProcessor(processorGroup, job);
if (isDefined(groupResult)) {
result = groupResult;
}
}
return result;
}, options);
}
private async handleProcessor(
{ instance, host, processMethodNames, isRequestScoped }: ProcessorGroup,
job: MessageQueueJob<MessageQueueJobData>,
) {
): Promise<unknown> {
const filteredProcessMethodNames = processMethodNames.filter(
(processMethodName) => {
const metadata = this.metadataAccessor.getProcessMetadata(
@@ -161,7 +170,7 @@ export class MessageQueueExplorer implements OnModuleInit {
// Return early if no matching methods found
if (filteredProcessMethodNames.length === 0) {
return;
return undefined;
}
if (isRequestScoped) {
@@ -186,13 +195,13 @@ export class MessageQueueExplorer implements OnModuleInit {
contextId,
);
await this.invokeProcessMethods(
return await this.invokeProcessMethods(
contextInstance,
filteredProcessMethodNames,
job,
);
} else {
await this.invokeProcessMethods(
return await this.invokeProcessMethods(
instance,
filteredProcessMethodNames,
job,
@@ -204,11 +213,13 @@ export class MessageQueueExplorer implements OnModuleInit {
instance: object,
processMethodNames: string[],
job: MessageQueueJob<MessageQueueJobData>,
) {
): Promise<unknown> {
let result: unknown;
for (const processMethodName of processMethodNames) {
try {
// @ts-expect-error legacy noImplicitAny
await instance[processMethodName].call(instance, job.data);
result = await instance[processMethodName].call(instance, job.data);
} catch (err) {
if (shouldCaptureException(err)) {
this.exceptionHandlerService.captureExceptions([err]);
@@ -216,5 +227,7 @@ export class MessageQueueExplorer implements OnModuleInit {
throw err;
}
}
return result;
}
}
@@ -27,12 +27,12 @@ export class MessageQueueService {
}
}
add<T extends MessageQueueJobData>(
add<T extends MessageQueueJobData, TResult = void>(
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
return this.driver.add(this.queueName, jobName, data, options);
): Promise<TResult | void> {
return this.driver.add<T, TResult>(this.queueName, jobName, data, options);
}
addCron<T extends MessageQueueJobData | undefined>({
@@ -69,10 +69,10 @@ export class MessageQueueService {
});
}
work<T extends MessageQueueJobData>(
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
work<T extends MessageQueueJobData, TResult = void>(
handler: (job: MessageQueueJob<T>) => Promise<TResult> | TResult,
options?: MessageQueueWorkerOptions,
): void {
this.driver.work(this.queueName, handler, options);
this.driver.work<T, TResult>(this.queueName, handler, options);
}
}
@@ -22,6 +22,11 @@ export type MessagingMessageListFetchJobData = {
workspaceId: string;
};
export type MessagingMessageListFetchJobResult = {
messagesToImport: number;
messagesToDelete: number;
};
@Processor({
queueName: MessageQueue.messagingQueue,
scope: Scope.REQUEST,
@@ -37,7 +42,9 @@ export class MessagingMessageListFetchJob {
) {}
@Process(MessagingMessageListFetchJob.name)
async handle(data: MessagingMessageListFetchJobData): Promise<void> {
async handle(
data: MessagingMessageListFetchJobData,
): Promise<MessagingMessageListFetchJobResult> {
const { messageChannelId, workspaceId } = data;
await this.messagingMonitoringService.track({
@@ -48,7 +55,7 @@ export class MessagingMessageListFetchJob {
const authContext = buildSystemAuthContext(workspaceId);
await this.globalWorkspaceOrmManager.executeInWorkspaceContext(
return await this.globalWorkspaceOrmManager.executeInWorkspaceContext<MessagingMessageListFetchJobResult>(
async () => {
const messageChannel = await this.messageChannelRepository.findOne({
where: {
@@ -65,14 +72,14 @@ export class MessagingMessageListFetchJob {
workspaceId,
});
return;
return { messagesToImport: 0, messagesToDelete: 0 };
}
if (
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED
) {
return;
return { messagesToImport: 0, messagesToDelete: 0 };
}
try {
@@ -83,10 +90,11 @@ export class MessagingMessageListFetchJob {
messageChannelId: messageChannel.id,
});
await this.messagingMessageListFetchService.processMessageListFetch(
messageChannel,
workspaceId,
);
const fetchResult =
await this.messagingMessageListFetchService.processMessageListFetch(
messageChannel,
workspaceId,
);
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch.completed',
@@ -94,6 +102,8 @@ export class MessagingMessageListFetchJob {
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
return fetchResult;
} catch (error) {
await this.messageImportErrorHandlerService.handleDriverException(
error,
@@ -101,6 +111,8 @@ export class MessagingMessageListFetchJob {
messageChannel,
workspaceId,
);
return { messagesToImport: 0, messagesToDelete: 0 };
}
},
authContext,
@@ -21,6 +21,7 @@ import { type MessageChannelMessageAssociationWorkspaceEntity } from 'src/module
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
import { SyncMessageFoldersService } from 'src/modules/messaging/message-folder-manager/services/sync-message-folders.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { type MessagingMessageListFetchJobResult } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
import {
MessageImportExceptionHandlerService,
@@ -56,10 +57,10 @@ export class MessagingMessageListFetchService {
public async processMessageListFetch(
messageChannel: MessageChannelEntity,
workspaceId: string,
) {
): Promise<MessagingMessageListFetchJobResult> {
const authContext = buildSystemAuthContext(workspaceId);
await this.globalWorkspaceOrmManager.executeInWorkspaceContext(
return await this.globalWorkspaceOrmManager.executeInWorkspaceContext<MessagingMessageListFetchJobResult>(
async () => {
try {
const pendingGroupEmailActionsProcessed =
@@ -96,7 +97,7 @@ export class MessagingMessageListFetchService {
`WorkspaceId: ${workspaceId}, MessageChannelId: ${messageChannel.id} - Message channel not found`,
);
return;
return { messagesToImport: 0, messagesToDelete: 0 };
}
const messageFolders =
@@ -249,7 +250,10 @@ export class MessagingMessageListFetchService {
workspaceId,
);
return;
return {
messagesToImport: totalMessagesToImportCount,
messagesToDelete: allMessageExternalIdsToDelete.length,
};
}
this.logger.debug(
@@ -269,6 +273,11 @@ export class MessagingMessageListFetchService {
freshMessageChannel.connectedAccount,
workspaceId,
);
return {
messagesToImport: totalMessagesToImportCount,
messagesToDelete: allMessageExternalIdsToDelete.length,
};
} catch (error) {
await this.messageImportErrorHandlerService.handleDriverException(
error,
@@ -276,6 +285,8 @@ export class MessagingMessageListFetchService {
messageChannel,
workspaceId,
);
return { messagesToImport: 0, messagesToDelete: 0 };
}
},
authContext,
@@ -0,0 +1,95 @@
import {
MessageChannelSyncStage,
MessageFolderImportPolicy,
} from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const runListFetch = (channelId: string) =>
enqueueJobAndAwait<
MessagingMessageListFetchJobData,
MessagingMessageListFetchJobResult
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
messageChannelId: channelId,
workspaceId: WORKSPACE_ID,
});
const getSyncStateByFolderName = async (channelId: string) => {
const folders = await queryMessageFolders(channelId);
return Object.fromEntries(
folders.map((folder) => [folder.name, folder.isSynced]),
);
};
describe('Gmail folder discovery (integration)', () => {
const gmail = setupGmailMock({
inbox: [],
labels: [
{ id: 'INBOX', name: 'INBOX' },
{ id: 'SENT', name: 'SENT' },
{ id: 'Label_Work', name: 'Work' },
],
});
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
beforeAll(async () => {
jest.useRealTimers();
channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
messageFolderImportPolicy: MessageFolderImportPolicy.SELECTED_FOLDERS,
});
}, 60000);
afterAll(async () => {
await channel.cleanup();
});
it('rediscovers a folder appended after the first sync and leaves it unsynced under the selected-folders policy', async () => {
await runListFetch(channel.channelId);
const afterFirstSync = await pollUntil(
() => getSyncStateByFolderName(channel.channelId),
(state) => Object.keys(state).length === 3,
{ timeoutMs: 30_000 },
);
expect(afterFirstSync).toEqual({ INBOX: false, SENT: false, Work: false });
gmail.labels.add({ id: 'Label_Archive', name: 'Archive' });
await getCoreRepository<MessageChannelEntity>(MessageChannelEntity).update(
{ id: channel.channelId },
{ syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED },
);
await runListFetch(channel.channelId);
const afterSecondSync = await pollUntil(
() => getSyncStateByFolderName(channel.channelId),
(state) => Object.keys(state).length === 4,
{ timeoutMs: 30_000 },
);
expect(afterSecondSync).toEqual({
INBOX: false,
SENT: false,
Work: false,
Archive: false,
});
}, 60000);
});
@@ -0,0 +1,75 @@
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
import { findManyOperationFactory } from 'test/integration/graphql/utils/find-many-operation-factory.util';
import { makeGraphqlAPIRequest } from 'test/integration/graphql/utils/make-graphql-api-request.util';
import {
getGmailMessageSubject,
gmailMessage,
setupGmailMock,
} from 'test/integration/messaging/utils/gmail-mock.util';
import { connectMessagingAccount } from 'test/integration/messaging/utils/connect-messaging-account.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const findImportedSubjects = async (subjects: string[]): Promise<string[]> => {
const response = await makeGraphqlAPIRequest(
findManyOperationFactory({
objectMetadataSingularName: 'message',
objectMetadataPluralName: 'messages',
gqlFields: `id
subject`,
filter: { subject: { in: subjects } },
}),
);
return response.body.data.messages.edges
.map((edge: { node: { subject: string } }) => edge.node.subject)
.sort();
};
describe('Gmail message list fetch job (integration)', () => {
const inbox = [gmailMessage(), gmailMessage()];
setupGmailMock({ inbox, handle: 'connected-account@apple.dev' });
let channel: Awaited<ReturnType<typeof connectMessagingAccount>>;
beforeAll(async () => {
// The real BullMQ worker runs asynchronously, so polling uses real timers.
jest.useRealTimers();
channel = await connectMessagingAccount(ConnectedAccountProvider.GOOGLE);
}, 60000);
afterAll(async () => {
await channel.cleanup();
});
it('runs the full sync pipeline on the real worker: folders synced, messages imported', async () => {
// The cron schedules pending channels and enqueues their list-fetch onto the worker.
await enqueueJobAndAwait(
MessageQueue.cronQueue,
MessagingMessageListFetchCronJob,
{},
);
const expectedSubjects = inbox.map(getGmailMessageSubject);
const importedSubjects = await pollUntil(
() => findImportedSubjects(expectedSubjects),
(subjects) => subjects.length === expectedSubjects.length,
{ timeoutMs: 30_000 },
);
expect(importedSubjects).toEqual([...expectedSubjects].sort());
const folders = await queryMessageFolders(channel.channelId);
expect(folders.map((folder) => folder.name).sort()).toEqual([
'INBOX',
'SENT',
]);
}, 60000);
});
@@ -0,0 +1,87 @@
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
} from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import { MessagingRelaunchFailedMessageChannelJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const relaunchFailedChannel = (channelId: string) =>
enqueueJobAndAwait(
MessageQueue.messagingQueue,
MessagingRelaunchFailedMessageChannelJob,
{ workspaceId: WORKSPACE_ID, messageChannelId: channelId },
);
describe('Messaging failed-channel recovery (integration)', () => {
beforeAll(() => {
jest.useRealTimers();
});
it('recovers a FAILED_UNKNOWN channel and clears its throttle state', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncStage: MessageChannelSyncStage.FAILED,
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
throttleFailureCount: 3,
throttleRetryAfter: new Date(Date.now() + 60_000),
syncStageStartedAt: new Date(),
});
try {
await relaunchFailedChannel(channel.channelId);
const recovered = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState?.syncStatus === MessageChannelSyncStatus.ACTIVE,
{ timeoutMs: 30_000 },
);
expect(recovered.syncStage).toBe(
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
);
expect(recovered.syncStatus).toBe(MessageChannelSyncStatus.ACTIVE);
expect(recovered.throttleFailureCount).toBe(0);
expect(recovered.throttleRetryAfter).toBeNull();
expect(recovered.syncStageStartedAt).toBeNull();
} finally {
await channel.cleanup();
}
}, 60000);
it('leaves a FAILED_INSUFFICIENT_PERMISSIONS channel untouched', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncStage: MessageChannelSyncStage.FAILED,
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
});
try {
await relaunchFailedChannel(channel.channelId);
// No-op recovery: grace window for the worker to process, then assert no change.
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
() => false,
{ timeoutMs: 3_000 },
);
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
expect(channelAfter.syncStatus).toBe(
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
);
} finally {
await channel.cleanup();
}
}, 60000);
});
@@ -0,0 +1,103 @@
import { http, HttpResponse } from 'msw';
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
} from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const RETRY_AFTER_ISO = '2099-12-31T10:30:00.000Z';
const rateLimitedMessageList = () =>
http.get('*/gmail/v1/users/me/messages', () =>
HttpResponse.json(
{
error: {
code: 429,
message: 'Rate Limit Exceeded',
errors: [
{
reason: 'rateLimitExceeded',
message: `Rate Limit Exceeded. Retry after ${RETRY_AFTER_ISO}`,
},
],
},
},
{ status: 429 },
),
);
const runListFetch = (channelId: string) =>
enqueueJobAndAwait(
MessageQueue.messagingQueue,
MessagingMessageListFetchJob,
{ messageChannelId: channelId, workspaceId: WORKSPACE_ID },
);
describe('Messaging rate-limit throttling (integration)', () => {
const gmail = setupGmailMock({ inbox: [], handle: 'tim@apple.dev' });
beforeAll(() => {
jest.useRealTimers();
});
it('records the throttle backoff window on a 429 and keeps the channel active', async () => {
const channel = await seedMessageChannel({ workspaceId: WORKSPACE_ID });
try {
gmail.use(rateLimitedMessageList());
await runListFetch(channel.channelId);
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) => channelState.throttleRetryAfter !== null,
{ timeoutMs: 30_000 },
);
expect(channelAfter.throttleFailureCount).toBe(1);
expect(channelAfter.throttleRetryAfter).toBe(RETRY_AFTER_ISO);
expect(channelAfter.syncStatus).not.toBe(
MessageChannelSyncStatus.FAILED_UNKNOWN,
);
} finally {
await channel.cleanup();
}
}, 60000);
it('fails the channel as unknown once the throttle attempts are exhausted', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
throttleFailureCount: MESSAGING_THROTTLE_MAX_ATTEMPTS,
});
try {
gmail.use(rateLimitedMessageList());
await runListFetch(channel.channelId);
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState.syncStatus === MessageChannelSyncStatus.FAILED_UNKNOWN,
{ timeoutMs: 30_000 },
);
expect(channelAfter.syncStatus).toBe(
MessageChannelSyncStatus.FAILED_UNKNOWN,
);
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
} finally {
await channel.cleanup();
}
}, 60000);
});
@@ -0,0 +1,83 @@
import { MessageChannelSyncStage } from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const STALE_STARTED_AT = new Date(Date.now() - 31 * 60 * 1000);
const RECENT_STARTED_AT = new Date(Date.now() - 60 * 1000);
describe('Messaging stale-sync recovery (integration)', () => {
beforeAll(() => {
jest.useRealTimers();
});
it('resets stale ongoing channels to pending and leaves recent ones running', async () => {
const staleListFetch = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
syncStageStartedAt: STALE_STARTED_AT,
});
const staleImport = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
syncStageStartedAt: STALE_STARTED_AT,
});
const recentListFetch = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
syncStageStartedAt: RECENT_STARTED_AT,
});
try {
await enqueueJobAndAwait(
MessageQueue.messagingQueue,
MessagingOngoingStaleJob,
{ workspaceId: WORKSPACE_ID },
);
const staleListFetchAfter = await pollUntil(
() =>
queryMessageChannel(
staleListFetch.connectedAccountId,
staleListFetch.channelId,
),
(channelState) =>
channelState.syncStage ===
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
{ timeoutMs: 30_000 },
);
expect(staleListFetchAfter.syncStage).toBe(
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
);
expect(staleListFetchAfter.syncStageStartedAt).toBeNull();
const staleImportAfter = await queryMessageChannel(
staleImport.connectedAccountId,
staleImport.channelId,
);
expect(staleImportAfter.syncStage).toBe(
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
);
expect(staleImportAfter.syncStageStartedAt).toBeNull();
const recentListFetchAfter = await queryMessageChannel(
recentListFetch.connectedAccountId,
recentListFetch.channelId,
);
expect(recentListFetchAfter.syncStage).toBe(
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
);
} finally {
await staleListFetch.cleanup();
await staleImport.cleanup();
await recentListFetch.cleanup();
}
}, 60000);
});
@@ -0,0 +1,81 @@
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import {
gmailMessage,
setupGmailMock,
} from 'test/integration/messaging/utils/gmail-mock.util';
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const HISTORY_ID = '987654321';
const runListFetch = (channelId: string) =>
enqueueJobAndAwait<
MessagingMessageListFetchJobData,
MessagingMessageListFetchJobResult
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
messageChannelId: channelId,
workspaceId: WORKSPACE_ID,
});
describe('Messaging sync cursor (integration)', () => {
// No history.list handler is registered, so MSW's 'error' strategy fails the test if the
// sync wrongly takes the incremental path instead of a full sync.
setupGmailMock({
inbox: [
gmailMessage({ historyId: HISTORY_ID }),
gmailMessage({ historyId: HISTORY_ID }),
],
handle: 'tim@apple.dev',
});
beforeAll(() => {
jest.useRealTimers();
});
const runAndAwaitCursor = async (channelId: string) => {
await runListFetch(channelId);
return pollUntil(
() => getMessageChannel(channelId),
(channel) => channel.syncCursor === HISTORY_ID,
{ timeoutMs: 30_000 },
);
};
it('runs a full sync and seeds the cursor when the cursor is null', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncCursor: null,
});
try {
const channelAfter = await runAndAwaitCursor(channel.channelId);
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
} finally {
await channel.cleanup();
}
}, 60000);
it('runs a full sync when the cursor is an empty string', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
syncCursor: '',
});
try {
const channelAfter = await runAndAwaitCursor(channel.channelId);
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
} finally {
await channel.cleanup();
}
}, 60000);
});
@@ -0,0 +1,109 @@
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
} from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import {
gmailMessage,
INVALID_REFRESH_TOKEN_PREFIX,
setupGmailMock,
} from 'test/integration/messaging/utils/gmail-mock.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
// Older than the 55-minute access-token validity window, so the stored access token is treated
// as expired and a refresh round-trip fires.
const EXPIRED_CREDENTIALS_AT = new Date(Date.now() - 56 * 60 * 1000);
const runListFetch = (channelId: string) =>
enqueueJobAndAwait<
MessagingMessageListFetchJobData,
MessagingMessageListFetchJobResult
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
messageChannelId: channelId,
workspaceId: WORKSPACE_ID,
});
const getConnectedAccount = (connectedAccountId: string) =>
getCoreRepository(ConnectedAccountEntity).findOneByOrFail({
id: connectedAccountId,
});
describe('Messaging token refresh (integration)', () => {
setupGmailMock({ inbox: [gmailMessage()], handle: 'tim@apple.dev' });
beforeAll(() => {
jest.useRealTimers();
});
it('refreshes an expired access token and completes the sync', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
lastCredentialsRefreshedAt: EXPIRED_CREDENTIALS_AT,
});
try {
await runListFetch(channel.channelId);
const connectedAccount = await pollUntil(
() => getConnectedAccount(channel.connectedAccountId),
(account) =>
(account.lastCredentialsRefreshedAt?.getTime() ?? 0) >
EXPIRED_CREDENTIALS_AT.getTime(),
{ timeoutMs: 30_000 },
);
expect(connectedAccount.authFailedAt).toBeNull();
expect(
connectedAccount.lastCredentialsRefreshedAt?.getTime(),
).toBeGreaterThan(EXPIRED_CREDENTIALS_AT.getTime());
} finally {
await channel.cleanup();
}
}, 60000);
it('marks the channel as insufficient-permissions when the refresh token is declined', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
lastCredentialsRefreshedAt: EXPIRED_CREDENTIALS_AT,
refreshTokenPlaintext: `${INVALID_REFRESH_TOKEN_PREFIX}-tim`,
});
try {
await runListFetch(channel.channelId);
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState.syncStatus ===
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
{ timeoutMs: 30_000 },
);
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
expect(channelAfter.syncStatus).toBe(
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
);
const connectedAccount = await getConnectedAccount(
channel.connectedAccountId,
);
expect(connectedAccount.authFailedAt).not.toBeNull();
} finally {
await channel.cleanup();
}
}, 60000);
});
@@ -0,0 +1,61 @@
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { setupMicrosoftMock } from 'test/integration/messaging/utils/microsoft-mock.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
describe('Microsoft folder discovery (integration)', () => {
setupMicrosoftMock({ inbox: [] });
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
const getFolderNames = async (channelId: string): Promise<string[]> => {
const folders = await queryMessageFolders(channelId);
return folders
.map((folder) => folder.name)
.filter((name): name is string => name !== null)
.sort();
};
beforeAll(async () => {
jest.useRealTimers();
channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
provider: ConnectedAccountProvider.MICROSOFT,
});
}, 60000);
afterAll(async () => {
await channel.cleanup();
});
it('discovers Microsoft mail folders through the Graph delta sync', async () => {
await enqueueJobAndAwait<
MessagingMessageListFetchJobData,
MessagingMessageListFetchJobResult
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
messageChannelId: channel.channelId,
workspaceId: WORKSPACE_ID,
});
const folderNames = await pollUntil(
() => getFolderNames(channel.channelId),
(names) => names.length === 2,
{ timeoutMs: 30_000 },
);
expect(folderNames).toEqual(['Inbox', 'Sent Items']);
}, 60000);
});
@@ -0,0 +1,79 @@
import gql from 'graphql-tag';
import request from 'supertest';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
import {
deleteConnectedAccount,
queryMessageChannels,
} from 'test/integration/messaging/utils/query-messaging.util';
type ConnectMessagingAccountResult = {
channelId: string;
connectedAccountId: string;
cleanup: () => Promise<void>;
};
const OAUTH_CALLBACK_PATH: Partial<Record<ConnectedAccountProvider, string>> = {
[ConnectedAccountProvider.GOOGLE]: '/auth/google-apis/get-access-token',
[ConnectedAccountProvider.MICROSOFT]: '/auth/microsoft-apis/get-access-token',
};
// Connects a messaging account through the real OAuth callback the providers redirect to:
// mints a transient token for the authed user, drives the callback with a code (the HTTP
// boundary is mocked), and lets the provider's auth service create a fresh connected account
// and message channel. Returns the channel and a cleanup that removes the account through the
// same mutation the product uses.
export const connectMessagingAccount = async (
provider: ConnectedAccountProvider,
): Promise<ConnectMessagingAccountResult> => {
const callbackPath = OAUTH_CALLBACK_PATH[provider];
if (!callbackPath) {
throw new Error(`Unsupported OAuth provider: ${provider}`);
}
const channelIdsBeforeConnect = new Set(
(await queryMessageChannels()).map((channel) => channel.id),
);
const transientTokenResponse = await makeMetadataAPIRequest({
query: gql`
mutation GenerateTransientToken {
generateTransientToken {
transientToken {
token
}
}
}
`,
});
const state = JSON.stringify({
transientToken:
transientTokenResponse.body.data.generateTransientToken.transientToken
.token,
messageVisibility: 'SHARE_EVERYTHING',
skipMessageChannelConfiguration: true,
});
const callbackResponse = await request(`http://localhost:${APP_PORT}`)
.get(callbackPath)
.query({ code: 'mock-authorization-code', state });
const connectedChannel = (await queryMessageChannels()).find(
(channel) => !channelIdsBeforeConnect.has(channel.id),
);
if (!connectedChannel) {
throw new Error(
`OAuth connect for ${provider} created no message channel (callback redirected to ${callbackResponse.headers.location})`,
);
}
return {
channelId: connectedChannel.id,
connectedAccountId: connectedChannel.connectedAccountId,
cleanup: () => deleteConnectedAccount(connectedChannel.connectedAccountId),
};
};
@@ -0,0 +1,5 @@
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
export const getMessageChannel = (channelId: string) =>
getCoreRepository(MessageChannelEntity).findOneByOrFail({ id: channelId });
@@ -0,0 +1,221 @@
import { randomUUID } from 'node:crypto';
import { type gmail_v1 } from 'googleapis';
import { http, HttpResponse, type RequestHandler } from 'msw';
import { setupHttpMock } from 'test/integration/utils/http-mock';
export const INVALID_REFRESH_TOKEN_PREFIX = 'invalid-refresh-token';
const GOOGLE_OAUTH_SCOPES = [
'email',
'profile',
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/calendar.events',
'https://www.googleapis.com/auth/profile.emails.read',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.compose',
].join(' ');
export const gmailMessage = (
overrides: Partial<gmail_v1.Schema$Message> = {},
): gmail_v1.Schema$Message => {
const id = overrides.id ?? `gmail-msg-${randomUUID()}`;
return {
id,
threadId: id,
historyId: '987654321',
internalDate: '1700000000000',
labelIds: ['INBOX'],
payload: {
mimeType: 'text/plain',
headers: [
{ name: 'From', value: 'sender@example.com' },
{ name: 'To', value: 'recipient@example.com' },
{ name: 'Subject', value: `Subject ${id}` },
{ name: 'Message-ID', value: `<${id}@example.com>` },
{ name: 'Date', value: 'Wed, 15 Nov 2023 00:00:00 +0000' },
],
body: {
data: Buffer.from(`body ${id}`).toString('base64'),
size: 10,
},
},
...overrides,
};
};
export const getGmailMessageSubject = (
message: gmail_v1.Schema$Message,
): string =>
message.payload?.headers?.find((header) => header.name === 'Subject')
?.value ?? '';
const DEFAULT_LABELS: gmail_v1.Schema$Label[] = [
{ id: 'INBOX', name: 'INBOX', type: 'system' },
{ id: 'SENT', name: 'SENT', type: 'system' },
];
export type GmailLabelStore = {
add: (label: gmail_v1.Schema$Label) => void;
remove: (labelId: string) => void;
reset: () => void;
list: () => gmail_v1.Schema$Label[];
};
const createGmailLabelStore = (
initialLabels: gmail_v1.Schema$Label[],
): GmailLabelStore => {
let labels = [...initialLabels];
return {
add: (label) => {
labels = [...labels, label];
},
remove: (labelId) => {
labels = labels.filter((label) => label.id !== labelId);
},
reset: () => {
labels = [...initialLabels];
},
list: () => labels,
};
};
const buildBatchMultipartResponse = (
inbox: gmail_v1.Schema$Message[],
): { body: string; contentType: string } => {
const boundary = 'batch_boundary';
const subResponses = inbox
.map((message) =>
[
`--${boundary}`,
'Content-Type: application/http',
'',
'HTTP/1.1 200 OK',
'Content-Type: application/json; charset=UTF-8',
'',
JSON.stringify(message),
].join('\r\n'),
)
.join('\r\n');
return {
body: `${subResponses}\r\n--${boundary}--`,
contentType: `multipart/mixed; boundary=${boundary}`,
};
};
const gmailHandlers = ({
inbox,
labelStore,
handle,
}: {
inbox: gmail_v1.Schema$Message[];
labelStore: GmailLabelStore;
handle: string;
}): RequestHandler[] => [
http.post('https://oauth2.googleapis.com/token', async ({ request }) => {
const body = new URLSearchParams(await request.text());
if (
(body.get('refresh_token') ?? '').startsWith(INVALID_REFRESH_TOKEN_PREFIX)
) {
return HttpResponse.json(
{ error: 'invalid_grant', error_description: 'Token has been revoked' },
{ status: 400 },
);
}
return HttpResponse.json({
access_token: 'mock-access-token',
expires_in: 3600,
scope: 'https://www.googleapis.com/auth/gmail.readonly',
token_type: 'Bearer',
});
}),
// --- OAuth connect flow (passport code exchange + connect-path availability checks) ---
http.post('https://www.googleapis.com/oauth2/v4/token', () =>
HttpResponse.json({
access_token: 'mock-access-token',
refresh_token: 'mock-refresh-token',
expires_in: 3600,
scope: GOOGLE_OAUTH_SCOPES,
token_type: 'Bearer',
}),
),
http.get('https://www.googleapis.com/oauth2/v3/userinfo', () =>
HttpResponse.json({
sub: 'google-user-id',
email: handle,
email_verified: true,
name: 'Jane Austen',
given_name: 'Jane',
family_name: 'Austen',
}),
),
http.get('https://www.googleapis.com/oauth2/v3/tokeninfo', () =>
HttpResponse.json({ scope: GOOGLE_OAUTH_SCOPES, email: handle }),
),
http.get('https://gmail.googleapis.com/gmail/v1/users/me/profile', () =>
HttpResponse.json({ emailAddress: handle, messagesTotal: 0 }),
),
http.get(
'https://www.googleapis.com/calendar/v3/calendars/primary/events',
() => HttpResponse.json({ items: [] }),
),
http.get('*/gmail/v1/users/me/settings/sendAs', () => {
const sendAs: gmail_v1.Schema$SendAs[] = [
{ sendAsEmail: handle, isPrimary: true },
];
return HttpResponse.json<gmail_v1.Schema$ListSendAsResponse>({ sendAs });
}),
http.get('*/gmail/v1/users/me/labels', () =>
HttpResponse.json<gmail_v1.Schema$ListLabelsResponse>({
labels: labelStore.list(),
}),
),
http.get('*/gmail/v1/users/me/messages', () => {
const messages: gmail_v1.Schema$Message[] = inbox.map((message) => ({
id: message.id,
threadId: message.threadId,
}));
return HttpResponse.json<gmail_v1.Schema$ListMessagesResponse>({
messages,
resultSizeEstimate: inbox.length,
});
}),
http.get('*/gmail/v1/users/me/messages/:messageId', ({ params }) =>
HttpResponse.json<gmail_v1.Schema$Message>({
id: String(params.messageId),
threadId: String(params.messageId),
historyId: inbox[0]?.historyId ?? '987654321',
}),
),
http.post('*/batch', () => {
const { body, contentType } = buildBatchMultipartResponse(inbox);
return new HttpResponse(body, { headers: { 'Content-Type': contentType } });
}),
];
export const setupGmailMock = ({
inbox,
labels = DEFAULT_LABELS,
handle = 'me@example.com',
}: {
inbox: gmail_v1.Schema$Message[];
labels?: gmail_v1.Schema$Label[];
handle?: string;
}): { labels: GmailLabelStore; use: (...handlers: unknown[]) => void } => {
const labelStore = createGmailLabelStore(labels);
const httpMock = setupHttpMock(
...gmailHandlers({ inbox, labelStore, handle }),
);
return { labels: labelStore, use: httpMock.use };
};
@@ -0,0 +1,104 @@
import { randomUUID } from 'node:crypto';
import {
type MailFolder,
type Message,
} from '@microsoft/microsoft-graph-types';
import { http, HttpResponse, type RequestHandler } from 'msw';
import { setupHttpMock } from 'test/integration/utils/http-mock';
export const microsoftMessage = (overrides: Partial<Message> = {}): Message => {
const id = overrides.id ?? `ms-msg-${randomUUID()}`;
return {
id,
subject: `Subject ${id}`,
internetMessageId: `<${id}@example.com>`,
receivedDateTime: '2023-11-15T00:00:00Z',
from: { emailAddress: { address: 'sender@example.com' } },
toRecipients: [{ emailAddress: { address: 'recipient@example.com' } }],
ccRecipients: [],
bccRecipients: [],
body: { contentType: 'text', content: `body ${id}` },
...overrides,
};
};
const DEFAULT_FOLDERS: MailFolder[] = [
{ id: 'inbox', displayName: 'Inbox' },
{ id: 'sentitems', displayName: 'Sent Items' },
];
export type MicrosoftFolderStore = {
add: (folder: MailFolder) => void;
remove: (folderId: string) => void;
reset: () => void;
list: () => MailFolder[];
};
const createMicrosoftFolderStore = (
initialFolders: MailFolder[],
): MicrosoftFolderStore => {
let folders = [...initialFolders];
return {
add: (folder) => {
folders = [...folders, folder];
},
remove: (folderId) => {
folders = folders.filter((folder) => folder.id !== folderId);
},
reset: () => {
folders = [...initialFolders];
},
list: () => folders,
};
};
const microsoftHandlers = ({
inbox,
folderStore,
}: {
inbox: Message[];
folderStore: MicrosoftFolderStore;
}): RequestHandler[] => [
http.get('*/me/mailFolders', () =>
HttpResponse.json<{ value: MailFolder[] }>({ value: folderStore.list() }),
),
http.get('*/messages/delta', () =>
HttpResponse.json<{ value: Message[]; '@odata.deltaLink': string }>({
value: inbox.map((message) => ({ id: message.id })),
'@odata.deltaLink':
'https://graph.microsoft.com/beta/me/mailfolders/inbox/messages/delta?$deltatoken=mock-delta-token',
}),
),
http.post('*/$batch', () =>
HttpResponse.json<{
responses: { id: string; status: number; body: Message }[];
}>({
responses: inbox.map((message, index) => ({
id: (index + 1).toString(),
status: 200,
body: message,
})),
}),
),
];
export const setupMicrosoftMock = ({
inbox,
folders = DEFAULT_FOLDERS,
}: {
inbox: Message[];
folders?: MailFolder[];
}): {
folders: MicrosoftFolderStore;
use: (...handlers: unknown[]) => void;
} => {
const folderStore = createMicrosoftFolderStore(folders);
const httpMock = setupHttpMock(...microsoftHandlers({ inbox, folderStore }));
return { folders: folderStore, use: httpMock.use };
};
@@ -0,0 +1,108 @@
// TODO: cleanup see if we can reuse fragments from frontend
import gql from 'graphql-tag';
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
export type MessageFolderDto = {
id: string;
name: string | null;
isSynced: boolean;
};
export type MessageChannelDto = {
id: string;
connectedAccountId: string;
syncStatus: string;
syncStage: string;
syncStageStartedAt: string | null;
throttleFailureCount: number;
throttleRetryAfter: string | null;
};
const MESSAGE_CHANNEL_FIELDS = gql`
fragment TestMessageChannelFields on MessageChannel {
id
connectedAccountId
syncStatus
syncStage
syncStageStartedAt
throttleFailureCount
throttleRetryAfter
}
`;
export const queryMessageChannels = async (): Promise<MessageChannelDto[]> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageChannelsForTest {
myMessageChannels {
...TestMessageChannelFields
}
}
${MESSAGE_CHANNEL_FIELDS}
`,
});
return response.body.data.myMessageChannels;
};
export const deleteConnectedAccount = async (
connectedAccountId: string,
): Promise<void> => {
await makeMetadataAPIRequest({
query: gql`
mutation DeleteConnectedAccountForTest($id: UUID!) {
deleteConnectedAccount(id: $id) {
id
}
}
`,
variables: { id: connectedAccountId },
});
};
export const queryMessageFolders = async (
messageChannelId: string,
): Promise<MessageFolderDto[]> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageFoldersForTest($messageChannelId: UUID) {
myMessageFolders(messageChannelId: $messageChannelId) {
id
name
isSynced
}
}
`,
variables: { messageChannelId },
});
return response.body.data.myMessageFolders;
};
export const queryMessageChannel = async (
connectedAccountId: string,
messageChannelId: string,
): Promise<MessageChannelDto> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageChannelForTest($connectedAccountId: UUID) {
myMessageChannels(connectedAccountId: $connectedAccountId) {
...TestMessageChannelFields
}
}
${MESSAGE_CHANNEL_FIELDS}
`,
variables: { connectedAccountId },
});
const channel = response.body.data.myMessageChannels.find(
(candidate: MessageChannelDto) => candidate.id === messageChannelId,
);
if (!channel) {
throw new Error(`Message channel ${messageChannelId} not found`);
}
return channel;
};
@@ -0,0 +1,107 @@
import { randomUUID } from 'node:crypto';
import {
ConnectedAccountProvider,
MessageChannelPendingGroupEmailsAction,
MessageChannelSyncStage,
MessageChannelSyncStatus,
MessageChannelType,
MessageChannelVisibility,
MessageFolderImportPolicy,
} from 'twenty-shared/types';
import { ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity';
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
import { CONNECTED_ACCOUNT_DATA_SEED_IDS } from 'src/engine/workspace-manager/dev-seeder/data/constants/connected-account-data-seeds.constant';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
import { mintEncryptedToken } from 'test/integration/utils/mint-encrypted-token.util';
type SeedMessageChannelResult = {
channelId: string;
connectedAccountId: string;
handle: string;
cleanup: () => Promise<void>;
};
// Reuses a dev-seeded connected account, sets fresh encrypted tokens + provider on it, and
// creates a brand new association-free channel scheduled for list-fetch. Provider-agnostic:
// the channel/account shape is identical for Gmail and Microsoft. A unique channelId keeps
// each run isolated without a DB reset.
export const seedMessageChannel = async ({
workspaceId,
provider = ConnectedAccountProvider.GOOGLE,
handle = 'tim@apple.dev',
connectedAccountId = CONNECTED_ACCOUNT_DATA_SEED_IDS.JANE,
messageFolderImportPolicy = MessageFolderImportPolicy.ALL_FOLDERS,
syncStatus = MessageChannelSyncStatus.NOT_SYNCED,
syncStage = MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED,
syncCursor = null,
syncStageStartedAt = null,
throttleRetryAfter = null,
throttleFailureCount = 0,
lastCredentialsRefreshedAt = new Date(),
refreshTokenPlaintext = 'mock-refresh-token',
}: {
workspaceId: string;
provider?: ConnectedAccountProvider;
handle?: string;
connectedAccountId?: string;
messageFolderImportPolicy?: MessageFolderImportPolicy;
syncStatus?: MessageChannelSyncStatus;
syncStage?: MessageChannelSyncStage;
syncCursor?: string | null;
syncStageStartedAt?: Date | null;
throttleRetryAfter?: Date | null;
throttleFailureCount?: number;
lastCredentialsRefreshedAt?: Date | null;
refreshTokenPlaintext?: string;
}): Promise<SeedMessageChannelResult> => {
const connectedAccountRepository = getCoreRepository(ConnectedAccountEntity);
const messageChannelRepository = getCoreRepository(MessageChannelEntity);
const channelId = randomUUID();
await connectedAccountRepository.update(
{ id: connectedAccountId },
{
provider,
accessToken: mintEncryptedToken('mock-access-token', workspaceId),
refreshToken: mintEncryptedToken(refreshTokenPlaintext, workspaceId),
lastCredentialsRefreshedAt,
authFailedAt: null,
},
);
await messageChannelRepository.save({
id: channelId,
visibility: MessageChannelVisibility.SHARE_EVERYTHING,
handle,
type: MessageChannelType.EMAIL,
messageFolderImportPolicy,
pendingGroupEmailsAction: MessageChannelPendingGroupEmailsAction.NONE,
isSyncEnabled: true,
syncCursor,
syncStatus,
syncStage,
syncStageStartedAt,
throttleRetryAfter,
throttleFailureCount,
connectedAccountId,
workspaceId,
});
const cleanup = async () => {
await messageChannelRepository.delete({ id: channelId });
await connectedAccountRepository.update(
{ id: connectedAccountId },
{
accessToken: null,
refreshToken: null,
lastCredentialsRefreshedAt: null,
authFailedAt: null,
},
);
};
return { channelId, connectedAccountId, handle, cleanup };
};
@@ -18,6 +18,7 @@ import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handl
import { ExceptionHandlerMockService } from 'src/engine/core-modules/exception-handler/mocks/exception-handler-mock.service';
import { MockedUnhandledExceptionFilter } from 'src/engine/core-modules/exception-handler/mocks/mock-unhandled-exception.filter';
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces/message-queue-module-options.interface';
import { JobsModule } from 'src/engine/core-modules/message-queue/jobs.module';
import { QUEUE_DRIVER } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
@@ -66,9 +67,15 @@ export const createApp = async (
getCurrentDriver: () => ({
validate: async () => ({ success: true }),
}),
})
.overrideProvider(QUEUE_DRIVER)
.useValue(syncDriver);
});
// Default to the in-band SyncDriver; opt into the real Redis-backed BullMQ driver (and the
// in-process workers the explorer spins up) with MESSAGE_QUEUE_TYPE=bull-mq.
if (process.env.MESSAGE_QUEUE_TYPE !== MessageQueueDriverType.BullMQ) {
moduleBuilder = moduleBuilder
.overrideProvider(QUEUE_DRIVER)
.useValue(syncDriver);
}
if (config.moduleBuilderHook) {
moduleBuilder = config.moduleBuilderHook(moduleBuilder);
@@ -0,0 +1,23 @@
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { type MessageQueueJobData } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
export const enqueueJobAndAwait = async <
TData extends MessageQueueJobData,
TResult,
>(
queue: MessageQueue,
job: { name: string },
data: TData,
): Promise<TResult> => {
const messageQueueService = global.app.get<MessageQueueService>(
getQueueToken(queue),
{ strict: false },
);
return (await messageQueueService.add<TData, TResult>(
job.name,
data,
)) as TResult;
};
@@ -0,0 +1,11 @@
import { type EntityClassOrSchema } from '@nestjs/typeorm/dist/interfaces/entity-class-or-schema.type';
import { getRepositoryToken } from '@nestjs/typeorm';
import { type ObjectLiteral, type Repository } from 'typeorm';
export const getCoreRepository = <Entity extends ObjectLiteral>(
target: EntityClassOrSchema,
): Repository<Entity> =>
global.app.get<Repository<Entity>>(getRepositoryToken(target), {
strict: false,
});
@@ -0,0 +1,50 @@
import { http, passthrough, type RequestHandler } from 'msw';
import { setupServer } from 'msw/node';
// MSW patches the whole process, so it also sees supertest's inbound calls to our own app.
// Let those through. Keep the matchers colon-free: `http://localhost:*` makes path-to-regexp
// parse `:` as a param and crashes beforeAll (msw#2202).
const localhostPassthroughHandlers = [
http.all('http://127.0.0.1*', () => passthrough()),
http.all('http://localhost*', () => passthrough()),
];
// One setupServer per test process — multiple instances re-patch node's request modules
// (msw#821). resetHandlers() restores exactly these initial handlers.
const server = setupServer(...localhostPassthroughHandlers);
export type HttpMock = {
use: (...handlers: unknown[]) => void;
};
// Opt-in: a suite calls this at describe scope to wire MSW's lifecycle (listen/reset/close)
// onto the shared server with `onUnhandledRequest: 'error'`, so any un-mocked outbound call
// fails loudly instead of hitting the real network. `baseHandlers` are re-applied per test as
// the suite's happy-path; per-test variation goes through the returned `use`.
// Handlers are typed loosely because msw's RequestHandler isn't portably nameable in an
// exported signature under our tsconfig; callers always pass RequestHandler values.
export const setupHttpMock = (...baseHandlers: unknown[]): HttpMock => {
const applyBaseHandlers = () => {
if (baseHandlers.length > 0) {
server.use(...(baseHandlers as RequestHandler[]));
}
};
// Apply the base handlers in beforeAll as well as beforeEach: suites that connect an account
// in their own beforeAll need the handlers active before the first test starts.
beforeAll(() => {
server.listen({ onUnhandledRequest: 'error' });
applyBaseHandlers();
});
beforeEach(applyBaseHandlers);
afterEach(() => server.resetHandlers());
afterAll(() => server.close());
return {
use: (...handlers: unknown[]) =>
server.use(...(handlers as RequestHandler[])),
};
};
@@ -0,0 +1,17 @@
import { type EncryptedString } from 'src/engine/core-modules/secret-encryption/branded-strings/encrypted-string.type';
import { type PlaintextString } from 'src/engine/core-modules/secret-encryption/branded-strings/plaintext-string.type';
import { SecretEncryptionService } from 'src/engine/core-modules/secret-encryption/secret-encryption.service';
import { type EnvironmentConfigDriver } from 'src/engine/core-modules/twenty-config/drivers/environment-config.driver';
// SecretEncryptionService is a class-token provider, unreachable from a spec via global.app
// (module-realm split). It only needs `get(key)`, so build it by hand off process.env —
// same APP_SECRET the app uses, so the pipeline decrypts the result for free.
export const mintEncryptedToken = (
plaintext: string,
workspaceId: string,
): EncryptedString =>
new SecretEncryptionService({
get: (key: string) => process.env[key],
} as EnvironmentConfigDriver).encryptVersioned(plaintext as PlaintextString, {
workspaceId,
});
@@ -0,0 +1,24 @@
import { setTimeout as sleep } from 'node:timers/promises';
// Polls `read` until `predicate` passes or the timeout elapses, then returns the last value
// (so the caller's assertion produces a meaningful diff on timeout). Used to await the
// eventual outcome of jobs running on the real BullMQ worker.
export const pollUntil = async <TValue>(
read: () => Promise<TValue>,
predicate: (value: TValue) => boolean,
{
timeoutMs = 30_000,
intervalMs = 250,
}: { timeoutMs?: number; intervalMs?: number } = {},
): Promise<TValue> => {
const startedAt = Date.now();
let value = await read();
while (!predicate(value) && Date.now() - startedAt < timeoutMs) {
await sleep(intervalMs);
value = await read();
}
return value;
};
@@ -1,3 +1,4 @@
import nodeFetch from 'node-fetch';
import { type JestConfigWithTsJest } from 'ts-jest';
import 'tsconfig-paths/register';
@@ -6,6 +7,11 @@ import { rawDataSource } from 'src/database/typeorm/raw/raw.datasource';
import { createApp } from './create-app';
export default async (_: unknown, projectConfig: JestConfigWithTsJest) => {
// Route the app's global fetch through node-fetch (node:http) so MSW — which patches the
// shared node:http builtin — intercepts clients that bottom out at global fetch (e.g. the
// Microsoft Graph client). Native undici fetch is realm-local and escapes MSW.
globalThis.fetch = nodeFetch as unknown as typeof globalThis.fetch;
const app = await createApp({});
if (!projectConfig.globals) {