Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a73020eb45 | |||
| 0a4dd19472 |
@@ -14,13 +14,13 @@ IS_WORKSPACE_CREATION_LIMITED_TO_SERVER_ADMINS=false
|
||||
|
||||
ENTERPRISE_VALIDITY_TOKEN=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkZXYtc3Vic2NyaXB0aW9uLWlkIiwic3RhdHVzIjoidmFsaWQiLCJpYXQiOjE3NzMzMDg4MzMsImV4cCI6NDg5NzUxMTIzM30.qhfrW_SV2Y86fWtWXsALlAVhxmMxylUUIefN0fki10Q2NTGGqFVXZrNn2WacJY37yq3m5y4WgwZw34ua6E0ff_YUXsrlY5OHJWHT9DMqKCRn-JujHJnnYp3VHLncy5CvxH5r9mfPFp-5AWe1pYeR1T63sTiejH3sfDrNE357SB7KVti8LCcnsJxEtXB2tRnvyvdun7A-GKoKYEIam-16ZRKKFs6GaWo8ObHdfm8yBt6uK4DZSGPWb644QyWh9FtDxbzJ0ti54DuHSlErLgIp1NNEsMA0MK7zFY7StRaOdt72rxE1ZHwN7e6HhweTU4ORVUPfYkjDFLB2fF7Pa7Kvdg
|
||||
|
||||
AUTH_GOOGLE_ENABLED=false
|
||||
MESSAGING_PROVIDER_GMAIL_ENABLED=false
|
||||
AUTH_GOOGLE_ENABLED=true
|
||||
MESSAGING_PROVIDER_GMAIL_ENABLED=true
|
||||
IS_IMAP_SMTP_CALDAV_ENABLED=true
|
||||
IS_IMAP_SMTP_CALDAV_CONNECTION_TEST_ENABLED=false
|
||||
CALENDAR_PROVIDER_GOOGLE_ENABLED=false
|
||||
MESSAGING_PROVIDER_MICROSOFT_ENABLED=false
|
||||
CALENDAR_PROVIDER_MICROSOFT_ENABLED=false
|
||||
CALENDAR_PROVIDER_GOOGLE_ENABLED=true
|
||||
MESSAGING_PROVIDER_MICROSOFT_ENABLED=true
|
||||
CALENDAR_PROVIDER_MICROSOFT_ENABLED=true
|
||||
|
||||
AUTH_GOOGLE_CALLBACK_URL=http://localhost:3000/auth/google/redirect
|
||||
AUTH_GOOGLE_APIS_CALLBACK_URL=http://localhost:3000/auth/google-apis/get-access-token
|
||||
@@ -34,3 +34,6 @@ IS_WORKSPACE_CREATION_V2_ENABLED=true
|
||||
SHOULD_SEED_STANDARD_RECORD_PAGE_LAYOUTS=true
|
||||
|
||||
LOGIC_FUNCTION_TYPE=LOCAL
|
||||
IS_CONFIG_VARIABLES_IN_DB_ENABLED=false
|
||||
AUTH_GOOGLE_CLIENT_ID=mock-google-client-id
|
||||
AUTH_GOOGLE_CLIENT_SECRET=mock-google-client-secret
|
||||
|
||||
@@ -33,6 +33,9 @@ const jestConfig: JestConfigWithTsJest = {
|
||||
],
|
||||
testRegex: '\\.integration-spec\\.ts$',
|
||||
modulePathIgnorePatterns: ['<rootDir>/dist'],
|
||||
transformIgnorePatterns: [
|
||||
'/node_modules/(?!(msw|@mswjs|until-async|@bundled-es-modules|@open-draft|strict-event-emitter|headers-polyfill|outvariant|is-node-process|tough-cookie|path-to-regexp|statuses|cookie|digest-fetch|md5|email-reply-parser)/)',
|
||||
],
|
||||
globalSetup: '<rootDir>/test/integration/utils/setup-test.ts',
|
||||
globalTeardown: '<rootDir>/test/integration/utils/teardown-test.ts',
|
||||
testTimeout: 20000,
|
||||
|
||||
+12
-6
@@ -92,9 +92,9 @@ export class BullMQDriver
|
||||
]);
|
||||
}
|
||||
|
||||
work<T>(
|
||||
work<T, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
handler: (job: MessageQueueJob<T>) => Promise<void>,
|
||||
handler: (job: MessageQueueJob<T>) => Promise<TResult>,
|
||||
options?: MessageQueueWorkerOptions,
|
||||
) {
|
||||
const workerOptions = {
|
||||
@@ -110,7 +110,7 @@ export class BullMQDriver
|
||||
|
||||
this.workerMap[queueName] = new Worker(
|
||||
queueName,
|
||||
async (job) =>
|
||||
async (job): Promise<TResult> =>
|
||||
Sentry.withIsolationScope(async () => {
|
||||
applyWorkspaceSentryContextFromJobData(job.data);
|
||||
|
||||
@@ -124,13 +124,19 @@ export class BullMQDriver
|
||||
this.logger.log(
|
||||
`Processing job ${job.id} with name ${job.name} on queue ${queueName}${workspaceSuffix}`,
|
||||
);
|
||||
await handler({ data: job.data, id: job.id ?? '', name: job.name });
|
||||
const result = await handler({
|
||||
data: job.data,
|
||||
id: job.id ?? '',
|
||||
name: job.name,
|
||||
});
|
||||
const timeEnd = performance.now();
|
||||
const executionTime = timeEnd - timeStart;
|
||||
|
||||
this.logger.log(
|
||||
`Job ${job.id} with name ${job.name} processed on queue ${queueName} in ${executionTime.toFixed(2)}ms${workspaceSuffix}`,
|
||||
);
|
||||
|
||||
return result;
|
||||
}),
|
||||
workerOptions,
|
||||
);
|
||||
@@ -217,12 +223,12 @@ export class BullMQDriver
|
||||
);
|
||||
}
|
||||
|
||||
async add<T>(
|
||||
async add<T, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
): Promise<TResult | void> {
|
||||
if (!this.queueMap[queueName]) {
|
||||
throw new Error(
|
||||
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
|
||||
|
||||
+10
-4
@@ -8,15 +8,21 @@ import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-
|
||||
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
|
||||
export interface MessageQueueDriver {
|
||||
add<T extends MessageQueueJobData>(
|
||||
add<T extends MessageQueueJobData, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void>;
|
||||
work<T extends MessageQueueJobData>(
|
||||
): Promise<TResult | void>;
|
||||
work<T extends MessageQueueJobData, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
|
||||
handler: ({
|
||||
data,
|
||||
id,
|
||||
}: {
|
||||
data: T;
|
||||
id: string;
|
||||
}) => Promise<TResult> | TResult,
|
||||
options?: MessageQueueWorkerOptions,
|
||||
): void;
|
||||
addCron<T extends MessageQueueJobData | undefined>({
|
||||
|
||||
+22
-14
@@ -12,17 +12,21 @@ import { type MessageQueue } from 'src/engine/core-modules/message-queue/message
|
||||
export class SyncDriver implements MessageQueueDriver {
|
||||
private readonly logger = new Logger(SyncDriver.name);
|
||||
private workersMap: {
|
||||
[queueName: string]: (job: MessageQueueJob) => Promise<void> | void;
|
||||
[queueName: string]: (job: MessageQueueJob) => Promise<unknown> | unknown;
|
||||
} = {};
|
||||
|
||||
constructor() {}
|
||||
|
||||
async add<T extends MessageQueueJobData>(
|
||||
async add<T extends MessageQueueJobData, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
jobName: string,
|
||||
data: T,
|
||||
): Promise<void> {
|
||||
await this.processJob(queueName, { id: '', name: jobName, data });
|
||||
): Promise<TResult | void> {
|
||||
return await this.processJob<T, TResult>(queueName, {
|
||||
id: '',
|
||||
name: jobName,
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
async addCron<T extends MessageQueueJobData | undefined>({
|
||||
@@ -48,26 +52,30 @@ export class SyncDriver implements MessageQueueDriver {
|
||||
this.logger.log(`Removing '${queueName}' cron job with SyncDriver`);
|
||||
}
|
||||
|
||||
work<T extends MessageQueueJobData>(
|
||||
work<T extends MessageQueueJobData, TResult = void>(
|
||||
queueName: MessageQueue,
|
||||
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
|
||||
handler: (job: MessageQueueJob<T>) => Promise<TResult> | TResult,
|
||||
): void {
|
||||
this.logger.log(`Registering handler for queue: ${queueName}`);
|
||||
this.workersMap[queueName] = handler;
|
||||
this.workersMap[queueName] = handler as (
|
||||
job: MessageQueueJob,
|
||||
) => Promise<unknown> | unknown;
|
||||
}
|
||||
|
||||
async processJob<T extends MessageQueueJobData>(
|
||||
async processJob<T extends MessageQueueJobData, TResult = unknown>(
|
||||
queueName: string,
|
||||
job: MessageQueueJob<T>,
|
||||
) {
|
||||
): Promise<TResult | undefined> {
|
||||
const worker = this.workersMap[queueName];
|
||||
|
||||
if (worker) {
|
||||
await worker(job);
|
||||
} else {
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
this.logger.error(`No handler found for job: ${queueName}`);
|
||||
}
|
||||
return (await worker(job)) as TResult;
|
||||
}
|
||||
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
this.logger.error(`No handler found for job: ${queueName}`);
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
+20
-7
@@ -22,6 +22,7 @@ import { type MessageQueue } from 'src/engine/core-modules/message-queue/message
|
||||
import { QUEUE_WORKER_OPTIONS } from 'src/engine/core-modules/message-queue/message-queue-worker-options.constant';
|
||||
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
|
||||
import { shouldCaptureException } from 'src/engine/utils/global-exception-handler.util';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
interface ProcessorGroup {
|
||||
instance: object;
|
||||
@@ -138,16 +139,24 @@ export class MessageQueueExplorer implements OnModuleInit {
|
||||
options?: MessageQueueWorkerOptions,
|
||||
) {
|
||||
queue.work(async (job) => {
|
||||
let result: unknown;
|
||||
|
||||
for (const processorGroup of processorGroupCollection) {
|
||||
await this.handleProcessor(processorGroup, job);
|
||||
const groupResult = await this.handleProcessor(processorGroup, job);
|
||||
|
||||
if (isDefined(groupResult)) {
|
||||
result = groupResult;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}, options);
|
||||
}
|
||||
|
||||
private async handleProcessor(
|
||||
{ instance, host, processMethodNames, isRequestScoped }: ProcessorGroup,
|
||||
job: MessageQueueJob<MessageQueueJobData>,
|
||||
) {
|
||||
): Promise<unknown> {
|
||||
const filteredProcessMethodNames = processMethodNames.filter(
|
||||
(processMethodName) => {
|
||||
const metadata = this.metadataAccessor.getProcessMetadata(
|
||||
@@ -161,7 +170,7 @@ export class MessageQueueExplorer implements OnModuleInit {
|
||||
|
||||
// Return early if no matching methods found
|
||||
if (filteredProcessMethodNames.length === 0) {
|
||||
return;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (isRequestScoped) {
|
||||
@@ -186,13 +195,13 @@ export class MessageQueueExplorer implements OnModuleInit {
|
||||
contextId,
|
||||
);
|
||||
|
||||
await this.invokeProcessMethods(
|
||||
return await this.invokeProcessMethods(
|
||||
contextInstance,
|
||||
filteredProcessMethodNames,
|
||||
job,
|
||||
);
|
||||
} else {
|
||||
await this.invokeProcessMethods(
|
||||
return await this.invokeProcessMethods(
|
||||
instance,
|
||||
filteredProcessMethodNames,
|
||||
job,
|
||||
@@ -204,11 +213,13 @@ export class MessageQueueExplorer implements OnModuleInit {
|
||||
instance: object,
|
||||
processMethodNames: string[],
|
||||
job: MessageQueueJob<MessageQueueJobData>,
|
||||
) {
|
||||
): Promise<unknown> {
|
||||
let result: unknown;
|
||||
|
||||
for (const processMethodName of processMethodNames) {
|
||||
try {
|
||||
// @ts-expect-error legacy noImplicitAny
|
||||
await instance[processMethodName].call(instance, job.data);
|
||||
result = await instance[processMethodName].call(instance, job.data);
|
||||
} catch (err) {
|
||||
if (shouldCaptureException(err)) {
|
||||
this.exceptionHandlerService.captureExceptions([err]);
|
||||
@@ -216,5 +227,7 @@ export class MessageQueueExplorer implements OnModuleInit {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
+6
-6
@@ -27,12 +27,12 @@ export class MessageQueueService {
|
||||
}
|
||||
}
|
||||
|
||||
add<T extends MessageQueueJobData>(
|
||||
add<T extends MessageQueueJobData, TResult = void>(
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
return this.driver.add(this.queueName, jobName, data, options);
|
||||
): Promise<TResult | void> {
|
||||
return this.driver.add<T, TResult>(this.queueName, jobName, data, options);
|
||||
}
|
||||
|
||||
addCron<T extends MessageQueueJobData | undefined>({
|
||||
@@ -69,10 +69,10 @@ export class MessageQueueService {
|
||||
});
|
||||
}
|
||||
|
||||
work<T extends MessageQueueJobData>(
|
||||
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
|
||||
work<T extends MessageQueueJobData, TResult = void>(
|
||||
handler: (job: MessageQueueJob<T>) => Promise<TResult> | TResult,
|
||||
options?: MessageQueueWorkerOptions,
|
||||
): void {
|
||||
this.driver.work(this.queueName, handler, options);
|
||||
this.driver.work<T, TResult>(this.queueName, handler, options);
|
||||
}
|
||||
}
|
||||
|
||||
+20
-8
@@ -22,6 +22,11 @@ export type MessagingMessageListFetchJobData = {
|
||||
workspaceId: string;
|
||||
};
|
||||
|
||||
export type MessagingMessageListFetchJobResult = {
|
||||
messagesToImport: number;
|
||||
messagesToDelete: number;
|
||||
};
|
||||
|
||||
@Processor({
|
||||
queueName: MessageQueue.messagingQueue,
|
||||
scope: Scope.REQUEST,
|
||||
@@ -37,7 +42,9 @@ export class MessagingMessageListFetchJob {
|
||||
) {}
|
||||
|
||||
@Process(MessagingMessageListFetchJob.name)
|
||||
async handle(data: MessagingMessageListFetchJobData): Promise<void> {
|
||||
async handle(
|
||||
data: MessagingMessageListFetchJobData,
|
||||
): Promise<MessagingMessageListFetchJobResult> {
|
||||
const { messageChannelId, workspaceId } = data;
|
||||
|
||||
await this.messagingMonitoringService.track({
|
||||
@@ -48,7 +55,7 @@ export class MessagingMessageListFetchJob {
|
||||
|
||||
const authContext = buildSystemAuthContext(workspaceId);
|
||||
|
||||
await this.globalWorkspaceOrmManager.executeInWorkspaceContext(
|
||||
return await this.globalWorkspaceOrmManager.executeInWorkspaceContext<MessagingMessageListFetchJobResult>(
|
||||
async () => {
|
||||
const messageChannel = await this.messageChannelRepository.findOne({
|
||||
where: {
|
||||
@@ -65,14 +72,14 @@ export class MessagingMessageListFetchJob {
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
return;
|
||||
return { messagesToImport: 0, messagesToDelete: 0 };
|
||||
}
|
||||
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
return { messagesToImport: 0, messagesToDelete: 0 };
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -83,10 +90,11 @@ export class MessagingMessageListFetchJob {
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
const fetchResult =
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.completed',
|
||||
@@ -94,6 +102,8 @@ export class MessagingMessageListFetchJob {
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
return fetchResult;
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
@@ -101,6 +111,8 @@ export class MessagingMessageListFetchJob {
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return { messagesToImport: 0, messagesToDelete: 0 };
|
||||
}
|
||||
},
|
||||
authContext,
|
||||
|
||||
+15
-4
@@ -21,6 +21,7 @@ import { type MessageChannelMessageAssociationWorkspaceEntity } from 'src/module
|
||||
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
|
||||
import { SyncMessageFoldersService } from 'src/modules/messaging/message-folder-manager/services/sync-message-folders.service';
|
||||
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
|
||||
import { type MessagingMessageListFetchJobResult } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import {
|
||||
MessageImportExceptionHandlerService,
|
||||
@@ -56,10 +57,10 @@ export class MessagingMessageListFetchService {
|
||||
public async processMessageListFetch(
|
||||
messageChannel: MessageChannelEntity,
|
||||
workspaceId: string,
|
||||
) {
|
||||
): Promise<MessagingMessageListFetchJobResult> {
|
||||
const authContext = buildSystemAuthContext(workspaceId);
|
||||
|
||||
await this.globalWorkspaceOrmManager.executeInWorkspaceContext(
|
||||
return await this.globalWorkspaceOrmManager.executeInWorkspaceContext<MessagingMessageListFetchJobResult>(
|
||||
async () => {
|
||||
try {
|
||||
const pendingGroupEmailActionsProcessed =
|
||||
@@ -96,7 +97,7 @@ export class MessagingMessageListFetchService {
|
||||
`WorkspaceId: ${workspaceId}, MessageChannelId: ${messageChannel.id} - Message channel not found`,
|
||||
);
|
||||
|
||||
return;
|
||||
return { messagesToImport: 0, messagesToDelete: 0 };
|
||||
}
|
||||
|
||||
const messageFolders =
|
||||
@@ -249,7 +250,10 @@ export class MessagingMessageListFetchService {
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
return {
|
||||
messagesToImport: totalMessagesToImportCount,
|
||||
messagesToDelete: allMessageExternalIdsToDelete.length,
|
||||
};
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
@@ -269,6 +273,11 @@ export class MessagingMessageListFetchService {
|
||||
freshMessageChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return {
|
||||
messagesToImport: totalMessagesToImportCount,
|
||||
messagesToDelete: allMessageExternalIdsToDelete.length,
|
||||
};
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
@@ -276,6 +285,8 @@ export class MessagingMessageListFetchService {
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return { messagesToImport: 0, messagesToDelete: 0 };
|
||||
}
|
||||
},
|
||||
authContext,
|
||||
|
||||
+95
@@ -0,0 +1,95 @@
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
MessageFolderImportPolicy,
|
||||
} from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import {
|
||||
type MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJob,
|
||||
type MessagingMessageListFetchJobResult,
|
||||
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
|
||||
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
|
||||
const runListFetch = (channelId: string) =>
|
||||
enqueueJobAndAwait<
|
||||
MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJobResult
|
||||
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
|
||||
messageChannelId: channelId,
|
||||
workspaceId: WORKSPACE_ID,
|
||||
});
|
||||
|
||||
const getSyncStateByFolderName = async (channelId: string) => {
|
||||
const folders = await queryMessageFolders(channelId);
|
||||
|
||||
return Object.fromEntries(
|
||||
folders.map((folder) => [folder.name, folder.isSynced]),
|
||||
);
|
||||
};
|
||||
|
||||
describe('Gmail folder discovery (integration)', () => {
|
||||
const gmail = setupGmailMock({
|
||||
inbox: [],
|
||||
labels: [
|
||||
{ id: 'INBOX', name: 'INBOX' },
|
||||
{ id: 'SENT', name: 'SENT' },
|
||||
{ id: 'Label_Work', name: 'Work' },
|
||||
],
|
||||
});
|
||||
|
||||
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
|
||||
|
||||
beforeAll(async () => {
|
||||
jest.useRealTimers();
|
||||
channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
messageFolderImportPolicy: MessageFolderImportPolicy.SELECTED_FOLDERS,
|
||||
});
|
||||
}, 60000);
|
||||
|
||||
afterAll(async () => {
|
||||
await channel.cleanup();
|
||||
});
|
||||
|
||||
it('rediscovers a folder appended after the first sync and leaves it unsynced under the selected-folders policy', async () => {
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const afterFirstSync = await pollUntil(
|
||||
() => getSyncStateByFolderName(channel.channelId),
|
||||
(state) => Object.keys(state).length === 3,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(afterFirstSync).toEqual({ INBOX: false, SENT: false, Work: false });
|
||||
|
||||
gmail.labels.add({ id: 'Label_Archive', name: 'Archive' });
|
||||
|
||||
await getCoreRepository<MessageChannelEntity>(MessageChannelEntity).update(
|
||||
{ id: channel.channelId },
|
||||
{ syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED },
|
||||
);
|
||||
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const afterSecondSync = await pollUntil(
|
||||
() => getSyncStateByFolderName(channel.channelId),
|
||||
(state) => Object.keys(state).length === 4,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(afterSecondSync).toEqual({
|
||||
INBOX: false,
|
||||
SENT: false,
|
||||
Work: false,
|
||||
Archive: false,
|
||||
});
|
||||
}, 60000);
|
||||
});
|
||||
+75
@@ -0,0 +1,75 @@
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { ConnectedAccountProvider } from 'twenty-shared/types';
|
||||
|
||||
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
|
||||
import { findManyOperationFactory } from 'test/integration/graphql/utils/find-many-operation-factory.util';
|
||||
import { makeGraphqlAPIRequest } from 'test/integration/graphql/utils/make-graphql-api-request.util';
|
||||
import {
|
||||
getGmailMessageSubject,
|
||||
gmailMessage,
|
||||
setupGmailMock,
|
||||
} from 'test/integration/messaging/utils/gmail-mock.util';
|
||||
import { connectMessagingAccount } from 'test/integration/messaging/utils/connect-messaging-account.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const findImportedSubjects = async (subjects: string[]): Promise<string[]> => {
|
||||
const response = await makeGraphqlAPIRequest(
|
||||
findManyOperationFactory({
|
||||
objectMetadataSingularName: 'message',
|
||||
objectMetadataPluralName: 'messages',
|
||||
gqlFields: `id
|
||||
subject`,
|
||||
filter: { subject: { in: subjects } },
|
||||
}),
|
||||
);
|
||||
|
||||
return response.body.data.messages.edges
|
||||
.map((edge: { node: { subject: string } }) => edge.node.subject)
|
||||
.sort();
|
||||
};
|
||||
|
||||
describe('Gmail message list fetch job (integration)', () => {
|
||||
const inbox = [gmailMessage(), gmailMessage()];
|
||||
|
||||
setupGmailMock({ inbox, handle: 'connected-account@apple.dev' });
|
||||
|
||||
let channel: Awaited<ReturnType<typeof connectMessagingAccount>>;
|
||||
|
||||
beforeAll(async () => {
|
||||
// The real BullMQ worker runs asynchronously, so polling uses real timers.
|
||||
jest.useRealTimers();
|
||||
channel = await connectMessagingAccount(ConnectedAccountProvider.GOOGLE);
|
||||
}, 60000);
|
||||
|
||||
afterAll(async () => {
|
||||
await channel.cleanup();
|
||||
});
|
||||
|
||||
it('runs the full sync pipeline on the real worker: folders synced, messages imported', async () => {
|
||||
// The cron schedules pending channels and enqueues their list-fetch onto the worker.
|
||||
await enqueueJobAndAwait(
|
||||
MessageQueue.cronQueue,
|
||||
MessagingMessageListFetchCronJob,
|
||||
{},
|
||||
);
|
||||
|
||||
const expectedSubjects = inbox.map(getGmailMessageSubject);
|
||||
|
||||
const importedSubjects = await pollUntil(
|
||||
() => findImportedSubjects(expectedSubjects),
|
||||
(subjects) => subjects.length === expectedSubjects.length,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
|
||||
expect(importedSubjects).toEqual([...expectedSubjects].sort());
|
||||
|
||||
const folders = await queryMessageFolders(channel.channelId);
|
||||
|
||||
expect(folders.map((folder) => folder.name).sort()).toEqual([
|
||||
'INBOX',
|
||||
'SENT',
|
||||
]);
|
||||
}, 60000);
|
||||
});
|
||||
+87
@@ -0,0 +1,87 @@
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
MessageChannelSyncStatus,
|
||||
} from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import { MessagingRelaunchFailedMessageChannelJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job';
|
||||
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
|
||||
const relaunchFailedChannel = (channelId: string) =>
|
||||
enqueueJobAndAwait(
|
||||
MessageQueue.messagingQueue,
|
||||
MessagingRelaunchFailedMessageChannelJob,
|
||||
{ workspaceId: WORKSPACE_ID, messageChannelId: channelId },
|
||||
);
|
||||
|
||||
describe('Messaging failed-channel recovery (integration)', () => {
|
||||
beforeAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('recovers a FAILED_UNKNOWN channel and clears its throttle state', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncStage: MessageChannelSyncStage.FAILED,
|
||||
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
throttleFailureCount: 3,
|
||||
throttleRetryAfter: new Date(Date.now() + 60_000),
|
||||
syncStageStartedAt: new Date(),
|
||||
});
|
||||
|
||||
try {
|
||||
await relaunchFailedChannel(channel.channelId);
|
||||
|
||||
const recovered = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(channel.connectedAccountId, channel.channelId),
|
||||
(channelState) =>
|
||||
channelState?.syncStatus === MessageChannelSyncStatus.ACTIVE,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
|
||||
expect(recovered.syncStage).toBe(
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
|
||||
);
|
||||
expect(recovered.syncStatus).toBe(MessageChannelSyncStatus.ACTIVE);
|
||||
expect(recovered.throttleFailureCount).toBe(0);
|
||||
expect(recovered.throttleRetryAfter).toBeNull();
|
||||
expect(recovered.syncStageStartedAt).toBeNull();
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
|
||||
it('leaves a FAILED_INSUFFICIENT_PERMISSIONS channel untouched', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncStage: MessageChannelSyncStage.FAILED,
|
||||
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
});
|
||||
|
||||
try {
|
||||
await relaunchFailedChannel(channel.channelId);
|
||||
|
||||
// No-op recovery: grace window for the worker to process, then assert no change.
|
||||
const channelAfter = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(channel.connectedAccountId, channel.channelId),
|
||||
() => false,
|
||||
{ timeoutMs: 3_000 },
|
||||
);
|
||||
|
||||
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
|
||||
expect(channelAfter.syncStatus).toBe(
|
||||
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
);
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
+103
@@ -0,0 +1,103 @@
|
||||
import { http, HttpResponse } from 'msw';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
MessageChannelSyncStatus,
|
||||
} from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
|
||||
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util';
|
||||
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
const RETRY_AFTER_ISO = '2099-12-31T10:30:00.000Z';
|
||||
|
||||
const rateLimitedMessageList = () =>
|
||||
http.get('*/gmail/v1/users/me/messages', () =>
|
||||
HttpResponse.json(
|
||||
{
|
||||
error: {
|
||||
code: 429,
|
||||
message: 'Rate Limit Exceeded',
|
||||
errors: [
|
||||
{
|
||||
reason: 'rateLimitExceeded',
|
||||
message: `Rate Limit Exceeded. Retry after ${RETRY_AFTER_ISO}`,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{ status: 429 },
|
||||
),
|
||||
);
|
||||
|
||||
const runListFetch = (channelId: string) =>
|
||||
enqueueJobAndAwait(
|
||||
MessageQueue.messagingQueue,
|
||||
MessagingMessageListFetchJob,
|
||||
{ messageChannelId: channelId, workspaceId: WORKSPACE_ID },
|
||||
);
|
||||
|
||||
describe('Messaging rate-limit throttling (integration)', () => {
|
||||
const gmail = setupGmailMock({ inbox: [], handle: 'tim@apple.dev' });
|
||||
|
||||
beforeAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('records the throttle backoff window on a 429 and keeps the channel active', async () => {
|
||||
const channel = await seedMessageChannel({ workspaceId: WORKSPACE_ID });
|
||||
|
||||
try {
|
||||
gmail.use(rateLimitedMessageList());
|
||||
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const channelAfter = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(channel.connectedAccountId, channel.channelId),
|
||||
(channelState) => channelState.throttleRetryAfter !== null,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(channelAfter.throttleFailureCount).toBe(1);
|
||||
expect(channelAfter.throttleRetryAfter).toBe(RETRY_AFTER_ISO);
|
||||
expect(channelAfter.syncStatus).not.toBe(
|
||||
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
);
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
|
||||
it('fails the channel as unknown once the throttle attempts are exhausted', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
throttleFailureCount: MESSAGING_THROTTLE_MAX_ATTEMPTS,
|
||||
});
|
||||
|
||||
try {
|
||||
gmail.use(rateLimitedMessageList());
|
||||
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const channelAfter = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(channel.connectedAccountId, channel.channelId),
|
||||
(channelState) =>
|
||||
channelState.syncStatus === MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(channelAfter.syncStatus).toBe(
|
||||
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
);
|
||||
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
+83
@@ -0,0 +1,83 @@
|
||||
import { MessageChannelSyncStage } from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
|
||||
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
|
||||
const STALE_STARTED_AT = new Date(Date.now() - 31 * 60 * 1000);
|
||||
const RECENT_STARTED_AT = new Date(Date.now() - 60 * 1000);
|
||||
|
||||
describe('Messaging stale-sync recovery (integration)', () => {
|
||||
beforeAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('resets stale ongoing channels to pending and leaves recent ones running', async () => {
|
||||
const staleListFetch = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
|
||||
syncStageStartedAt: STALE_STARTED_AT,
|
||||
});
|
||||
const staleImport = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
|
||||
syncStageStartedAt: STALE_STARTED_AT,
|
||||
});
|
||||
const recentListFetch = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
|
||||
syncStageStartedAt: RECENT_STARTED_AT,
|
||||
});
|
||||
|
||||
try {
|
||||
await enqueueJobAndAwait(
|
||||
MessageQueue.messagingQueue,
|
||||
MessagingOngoingStaleJob,
|
||||
{ workspaceId: WORKSPACE_ID },
|
||||
);
|
||||
|
||||
const staleListFetchAfter = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(
|
||||
staleListFetch.connectedAccountId,
|
||||
staleListFetch.channelId,
|
||||
),
|
||||
(channelState) =>
|
||||
channelState.syncStage ===
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(staleListFetchAfter.syncStage).toBe(
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
|
||||
);
|
||||
expect(staleListFetchAfter.syncStageStartedAt).toBeNull();
|
||||
|
||||
const staleImportAfter = await queryMessageChannel(
|
||||
staleImport.connectedAccountId,
|
||||
staleImport.channelId,
|
||||
);
|
||||
expect(staleImportAfter.syncStage).toBe(
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
|
||||
);
|
||||
expect(staleImportAfter.syncStageStartedAt).toBeNull();
|
||||
|
||||
const recentListFetchAfter = await queryMessageChannel(
|
||||
recentListFetch.connectedAccountId,
|
||||
recentListFetch.channelId,
|
||||
);
|
||||
expect(recentListFetchAfter.syncStage).toBe(
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
|
||||
);
|
||||
} finally {
|
||||
await staleListFetch.cleanup();
|
||||
await staleImport.cleanup();
|
||||
await recentListFetch.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
+81
@@ -0,0 +1,81 @@
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import {
|
||||
type MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJob,
|
||||
type MessagingMessageListFetchJobResult,
|
||||
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import {
|
||||
gmailMessage,
|
||||
setupGmailMock,
|
||||
} from 'test/integration/messaging/utils/gmail-mock.util';
|
||||
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
const HISTORY_ID = '987654321';
|
||||
|
||||
const runListFetch = (channelId: string) =>
|
||||
enqueueJobAndAwait<
|
||||
MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJobResult
|
||||
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
|
||||
messageChannelId: channelId,
|
||||
workspaceId: WORKSPACE_ID,
|
||||
});
|
||||
|
||||
describe('Messaging sync cursor (integration)', () => {
|
||||
// No history.list handler is registered, so MSW's 'error' strategy fails the test if the
|
||||
// sync wrongly takes the incremental path instead of a full sync.
|
||||
setupGmailMock({
|
||||
inbox: [
|
||||
gmailMessage({ historyId: HISTORY_ID }),
|
||||
gmailMessage({ historyId: HISTORY_ID }),
|
||||
],
|
||||
handle: 'tim@apple.dev',
|
||||
});
|
||||
|
||||
beforeAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
const runAndAwaitCursor = async (channelId: string) => {
|
||||
await runListFetch(channelId);
|
||||
|
||||
return pollUntil(
|
||||
() => getMessageChannel(channelId),
|
||||
(channel) => channel.syncCursor === HISTORY_ID,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
};
|
||||
|
||||
it('runs a full sync and seeds the cursor when the cursor is null', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncCursor: null,
|
||||
});
|
||||
|
||||
try {
|
||||
const channelAfter = await runAndAwaitCursor(channel.channelId);
|
||||
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
|
||||
it('runs a full sync when the cursor is an empty string', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
syncCursor: '',
|
||||
});
|
||||
|
||||
try {
|
||||
const channelAfter = await runAndAwaitCursor(channel.channelId);
|
||||
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
+109
@@ -0,0 +1,109 @@
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
MessageChannelSyncStatus,
|
||||
} from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import {
|
||||
type MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJob,
|
||||
type MessagingMessageListFetchJobResult,
|
||||
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import {
|
||||
gmailMessage,
|
||||
INVALID_REFRESH_TOKEN_PREFIX,
|
||||
setupGmailMock,
|
||||
} from 'test/integration/messaging/utils/gmail-mock.util';
|
||||
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
|
||||
// Older than the 55-minute access-token validity window, so the stored access token is treated
|
||||
// as expired and a refresh round-trip fires.
|
||||
const EXPIRED_CREDENTIALS_AT = new Date(Date.now() - 56 * 60 * 1000);
|
||||
|
||||
const runListFetch = (channelId: string) =>
|
||||
enqueueJobAndAwait<
|
||||
MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJobResult
|
||||
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
|
||||
messageChannelId: channelId,
|
||||
workspaceId: WORKSPACE_ID,
|
||||
});
|
||||
|
||||
const getConnectedAccount = (connectedAccountId: string) =>
|
||||
getCoreRepository(ConnectedAccountEntity).findOneByOrFail({
|
||||
id: connectedAccountId,
|
||||
});
|
||||
|
||||
describe('Messaging token refresh (integration)', () => {
|
||||
setupGmailMock({ inbox: [gmailMessage()], handle: 'tim@apple.dev' });
|
||||
|
||||
beforeAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('refreshes an expired access token and completes the sync', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
lastCredentialsRefreshedAt: EXPIRED_CREDENTIALS_AT,
|
||||
});
|
||||
|
||||
try {
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const connectedAccount = await pollUntil(
|
||||
() => getConnectedAccount(channel.connectedAccountId),
|
||||
(account) =>
|
||||
(account.lastCredentialsRefreshedAt?.getTime() ?? 0) >
|
||||
EXPIRED_CREDENTIALS_AT.getTime(),
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
|
||||
expect(connectedAccount.authFailedAt).toBeNull();
|
||||
expect(
|
||||
connectedAccount.lastCredentialsRefreshedAt?.getTime(),
|
||||
).toBeGreaterThan(EXPIRED_CREDENTIALS_AT.getTime());
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
|
||||
it('marks the channel as insufficient-permissions when the refresh token is declined', async () => {
|
||||
const channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
lastCredentialsRefreshedAt: EXPIRED_CREDENTIALS_AT,
|
||||
refreshTokenPlaintext: `${INVALID_REFRESH_TOKEN_PREFIX}-tim`,
|
||||
});
|
||||
|
||||
try {
|
||||
await runListFetch(channel.channelId);
|
||||
|
||||
const channelAfter = await pollUntil(
|
||||
() =>
|
||||
queryMessageChannel(channel.connectedAccountId, channel.channelId),
|
||||
(channelState) =>
|
||||
channelState.syncStatus ===
|
||||
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
|
||||
expect(channelAfter.syncStatus).toBe(
|
||||
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
);
|
||||
|
||||
const connectedAccount = await getConnectedAccount(
|
||||
channel.connectedAccountId,
|
||||
);
|
||||
expect(connectedAccount.authFailedAt).not.toBeNull();
|
||||
} finally {
|
||||
await channel.cleanup();
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
+61
@@ -0,0 +1,61 @@
|
||||
import { ConnectedAccountProvider } from 'twenty-shared/types';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
|
||||
import {
|
||||
type MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJob,
|
||||
type MessagingMessageListFetchJobResult,
|
||||
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||
import { setupMicrosoftMock } from 'test/integration/messaging/utils/microsoft-mock.util';
|
||||
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
|
||||
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
|
||||
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
|
||||
import { pollUntil } from 'test/integration/utils/poll-until.util';
|
||||
|
||||
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
|
||||
|
||||
describe('Microsoft folder discovery (integration)', () => {
|
||||
setupMicrosoftMock({ inbox: [] });
|
||||
|
||||
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
|
||||
|
||||
const getFolderNames = async (channelId: string): Promise<string[]> => {
|
||||
const folders = await queryMessageFolders(channelId);
|
||||
|
||||
return folders
|
||||
.map((folder) => folder.name)
|
||||
.filter((name): name is string => name !== null)
|
||||
.sort();
|
||||
};
|
||||
|
||||
beforeAll(async () => {
|
||||
jest.useRealTimers();
|
||||
channel = await seedMessageChannel({
|
||||
workspaceId: WORKSPACE_ID,
|
||||
provider: ConnectedAccountProvider.MICROSOFT,
|
||||
});
|
||||
}, 60000);
|
||||
|
||||
afterAll(async () => {
|
||||
await channel.cleanup();
|
||||
});
|
||||
|
||||
it('discovers Microsoft mail folders through the Graph delta sync', async () => {
|
||||
await enqueueJobAndAwait<
|
||||
MessagingMessageListFetchJobData,
|
||||
MessagingMessageListFetchJobResult
|
||||
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
|
||||
messageChannelId: channel.channelId,
|
||||
workspaceId: WORKSPACE_ID,
|
||||
});
|
||||
|
||||
const folderNames = await pollUntil(
|
||||
() => getFolderNames(channel.channelId),
|
||||
(names) => names.length === 2,
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
|
||||
expect(folderNames).toEqual(['Inbox', 'Sent Items']);
|
||||
}, 60000);
|
||||
});
|
||||
+79
@@ -0,0 +1,79 @@
|
||||
import gql from 'graphql-tag';
|
||||
import request from 'supertest';
|
||||
import { ConnectedAccountProvider } from 'twenty-shared/types';
|
||||
|
||||
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
|
||||
import {
|
||||
deleteConnectedAccount,
|
||||
queryMessageChannels,
|
||||
} from 'test/integration/messaging/utils/query-messaging.util';
|
||||
|
||||
type ConnectMessagingAccountResult = {
|
||||
channelId: string;
|
||||
connectedAccountId: string;
|
||||
cleanup: () => Promise<void>;
|
||||
};
|
||||
|
||||
const OAUTH_CALLBACK_PATH: Partial<Record<ConnectedAccountProvider, string>> = {
|
||||
[ConnectedAccountProvider.GOOGLE]: '/auth/google-apis/get-access-token',
|
||||
[ConnectedAccountProvider.MICROSOFT]: '/auth/microsoft-apis/get-access-token',
|
||||
};
|
||||
|
||||
// Connects a messaging account through the real OAuth callback the providers redirect to:
|
||||
// mints a transient token for the authed user, drives the callback with a code (the HTTP
|
||||
// boundary is mocked), and lets the provider's auth service create a fresh connected account
|
||||
// and message channel. Returns the channel and a cleanup that removes the account through the
|
||||
// same mutation the product uses.
|
||||
export const connectMessagingAccount = async (
|
||||
provider: ConnectedAccountProvider,
|
||||
): Promise<ConnectMessagingAccountResult> => {
|
||||
const callbackPath = OAUTH_CALLBACK_PATH[provider];
|
||||
|
||||
if (!callbackPath) {
|
||||
throw new Error(`Unsupported OAuth provider: ${provider}`);
|
||||
}
|
||||
|
||||
const channelIdsBeforeConnect = new Set(
|
||||
(await queryMessageChannels()).map((channel) => channel.id),
|
||||
);
|
||||
|
||||
const transientTokenResponse = await makeMetadataAPIRequest({
|
||||
query: gql`
|
||||
mutation GenerateTransientToken {
|
||||
generateTransientToken {
|
||||
transientToken {
|
||||
token
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
});
|
||||
|
||||
const state = JSON.stringify({
|
||||
transientToken:
|
||||
transientTokenResponse.body.data.generateTransientToken.transientToken
|
||||
.token,
|
||||
messageVisibility: 'SHARE_EVERYTHING',
|
||||
skipMessageChannelConfiguration: true,
|
||||
});
|
||||
|
||||
const callbackResponse = await request(`http://localhost:${APP_PORT}`)
|
||||
.get(callbackPath)
|
||||
.query({ code: 'mock-authorization-code', state });
|
||||
|
||||
const connectedChannel = (await queryMessageChannels()).find(
|
||||
(channel) => !channelIdsBeforeConnect.has(channel.id),
|
||||
);
|
||||
|
||||
if (!connectedChannel) {
|
||||
throw new Error(
|
||||
`OAuth connect for ${provider} created no message channel (callback redirected to ${callbackResponse.headers.location})`,
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
channelId: connectedChannel.id,
|
||||
connectedAccountId: connectedChannel.connectedAccountId,
|
||||
cleanup: () => deleteConnectedAccount(connectedChannel.connectedAccountId),
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,5 @@
|
||||
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
|
||||
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
|
||||
|
||||
export const getMessageChannel = (channelId: string) =>
|
||||
getCoreRepository(MessageChannelEntity).findOneByOrFail({ id: channelId });
|
||||
@@ -0,0 +1,221 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import { type gmail_v1 } from 'googleapis';
|
||||
import { http, HttpResponse, type RequestHandler } from 'msw';
|
||||
|
||||
import { setupHttpMock } from 'test/integration/utils/http-mock';
|
||||
|
||||
export const INVALID_REFRESH_TOKEN_PREFIX = 'invalid-refresh-token';
|
||||
|
||||
const GOOGLE_OAUTH_SCOPES = [
|
||||
'email',
|
||||
'profile',
|
||||
'https://www.googleapis.com/auth/gmail.readonly',
|
||||
'https://www.googleapis.com/auth/calendar.events',
|
||||
'https://www.googleapis.com/auth/profile.emails.read',
|
||||
'https://www.googleapis.com/auth/gmail.send',
|
||||
'https://www.googleapis.com/auth/gmail.compose',
|
||||
].join(' ');
|
||||
|
||||
export const gmailMessage = (
|
||||
overrides: Partial<gmail_v1.Schema$Message> = {},
|
||||
): gmail_v1.Schema$Message => {
|
||||
const id = overrides.id ?? `gmail-msg-${randomUUID()}`;
|
||||
|
||||
return {
|
||||
id,
|
||||
threadId: id,
|
||||
historyId: '987654321',
|
||||
internalDate: '1700000000000',
|
||||
labelIds: ['INBOX'],
|
||||
payload: {
|
||||
mimeType: 'text/plain',
|
||||
headers: [
|
||||
{ name: 'From', value: 'sender@example.com' },
|
||||
{ name: 'To', value: 'recipient@example.com' },
|
||||
{ name: 'Subject', value: `Subject ${id}` },
|
||||
{ name: 'Message-ID', value: `<${id}@example.com>` },
|
||||
{ name: 'Date', value: 'Wed, 15 Nov 2023 00:00:00 +0000' },
|
||||
],
|
||||
body: {
|
||||
data: Buffer.from(`body ${id}`).toString('base64'),
|
||||
size: 10,
|
||||
},
|
||||
},
|
||||
...overrides,
|
||||
};
|
||||
};
|
||||
|
||||
export const getGmailMessageSubject = (
|
||||
message: gmail_v1.Schema$Message,
|
||||
): string =>
|
||||
message.payload?.headers?.find((header) => header.name === 'Subject')
|
||||
?.value ?? '';
|
||||
|
||||
const DEFAULT_LABELS: gmail_v1.Schema$Label[] = [
|
||||
{ id: 'INBOX', name: 'INBOX', type: 'system' },
|
||||
{ id: 'SENT', name: 'SENT', type: 'system' },
|
||||
];
|
||||
|
||||
export type GmailLabelStore = {
|
||||
add: (label: gmail_v1.Schema$Label) => void;
|
||||
remove: (labelId: string) => void;
|
||||
reset: () => void;
|
||||
list: () => gmail_v1.Schema$Label[];
|
||||
};
|
||||
|
||||
const createGmailLabelStore = (
|
||||
initialLabels: gmail_v1.Schema$Label[],
|
||||
): GmailLabelStore => {
|
||||
let labels = [...initialLabels];
|
||||
|
||||
return {
|
||||
add: (label) => {
|
||||
labels = [...labels, label];
|
||||
},
|
||||
remove: (labelId) => {
|
||||
labels = labels.filter((label) => label.id !== labelId);
|
||||
},
|
||||
reset: () => {
|
||||
labels = [...initialLabels];
|
||||
},
|
||||
list: () => labels,
|
||||
};
|
||||
};
|
||||
|
||||
const buildBatchMultipartResponse = (
|
||||
inbox: gmail_v1.Schema$Message[],
|
||||
): { body: string; contentType: string } => {
|
||||
const boundary = 'batch_boundary';
|
||||
const subResponses = inbox
|
||||
.map((message) =>
|
||||
[
|
||||
`--${boundary}`,
|
||||
'Content-Type: application/http',
|
||||
'',
|
||||
'HTTP/1.1 200 OK',
|
||||
'Content-Type: application/json; charset=UTF-8',
|
||||
'',
|
||||
JSON.stringify(message),
|
||||
].join('\r\n'),
|
||||
)
|
||||
.join('\r\n');
|
||||
|
||||
return {
|
||||
body: `${subResponses}\r\n--${boundary}--`,
|
||||
contentType: `multipart/mixed; boundary=${boundary}`,
|
||||
};
|
||||
};
|
||||
|
||||
const gmailHandlers = ({
|
||||
inbox,
|
||||
labelStore,
|
||||
handle,
|
||||
}: {
|
||||
inbox: gmail_v1.Schema$Message[];
|
||||
labelStore: GmailLabelStore;
|
||||
handle: string;
|
||||
}): RequestHandler[] => [
|
||||
http.post('https://oauth2.googleapis.com/token', async ({ request }) => {
|
||||
const body = new URLSearchParams(await request.text());
|
||||
|
||||
if (
|
||||
(body.get('refresh_token') ?? '').startsWith(INVALID_REFRESH_TOKEN_PREFIX)
|
||||
) {
|
||||
return HttpResponse.json(
|
||||
{ error: 'invalid_grant', error_description: 'Token has been revoked' },
|
||||
{ status: 400 },
|
||||
);
|
||||
}
|
||||
|
||||
return HttpResponse.json({
|
||||
access_token: 'mock-access-token',
|
||||
expires_in: 3600,
|
||||
scope: 'https://www.googleapis.com/auth/gmail.readonly',
|
||||
token_type: 'Bearer',
|
||||
});
|
||||
}),
|
||||
// --- OAuth connect flow (passport code exchange + connect-path availability checks) ---
|
||||
http.post('https://www.googleapis.com/oauth2/v4/token', () =>
|
||||
HttpResponse.json({
|
||||
access_token: 'mock-access-token',
|
||||
refresh_token: 'mock-refresh-token',
|
||||
expires_in: 3600,
|
||||
scope: GOOGLE_OAUTH_SCOPES,
|
||||
token_type: 'Bearer',
|
||||
}),
|
||||
),
|
||||
http.get('https://www.googleapis.com/oauth2/v3/userinfo', () =>
|
||||
HttpResponse.json({
|
||||
sub: 'google-user-id',
|
||||
email: handle,
|
||||
email_verified: true,
|
||||
name: 'Jane Austen',
|
||||
given_name: 'Jane',
|
||||
family_name: 'Austen',
|
||||
}),
|
||||
),
|
||||
http.get('https://www.googleapis.com/oauth2/v3/tokeninfo', () =>
|
||||
HttpResponse.json({ scope: GOOGLE_OAUTH_SCOPES, email: handle }),
|
||||
),
|
||||
http.get('https://gmail.googleapis.com/gmail/v1/users/me/profile', () =>
|
||||
HttpResponse.json({ emailAddress: handle, messagesTotal: 0 }),
|
||||
),
|
||||
http.get(
|
||||
'https://www.googleapis.com/calendar/v3/calendars/primary/events',
|
||||
() => HttpResponse.json({ items: [] }),
|
||||
),
|
||||
http.get('*/gmail/v1/users/me/settings/sendAs', () => {
|
||||
const sendAs: gmail_v1.Schema$SendAs[] = [
|
||||
{ sendAsEmail: handle, isPrimary: true },
|
||||
];
|
||||
|
||||
return HttpResponse.json<gmail_v1.Schema$ListSendAsResponse>({ sendAs });
|
||||
}),
|
||||
http.get('*/gmail/v1/users/me/labels', () =>
|
||||
HttpResponse.json<gmail_v1.Schema$ListLabelsResponse>({
|
||||
labels: labelStore.list(),
|
||||
}),
|
||||
),
|
||||
http.get('*/gmail/v1/users/me/messages', () => {
|
||||
const messages: gmail_v1.Schema$Message[] = inbox.map((message) => ({
|
||||
id: message.id,
|
||||
threadId: message.threadId,
|
||||
}));
|
||||
|
||||
return HttpResponse.json<gmail_v1.Schema$ListMessagesResponse>({
|
||||
messages,
|
||||
resultSizeEstimate: inbox.length,
|
||||
});
|
||||
}),
|
||||
http.get('*/gmail/v1/users/me/messages/:messageId', ({ params }) =>
|
||||
HttpResponse.json<gmail_v1.Schema$Message>({
|
||||
id: String(params.messageId),
|
||||
threadId: String(params.messageId),
|
||||
historyId: inbox[0]?.historyId ?? '987654321',
|
||||
}),
|
||||
),
|
||||
http.post('*/batch', () => {
|
||||
const { body, contentType } = buildBatchMultipartResponse(inbox);
|
||||
|
||||
return new HttpResponse(body, { headers: { 'Content-Type': contentType } });
|
||||
}),
|
||||
];
|
||||
|
||||
export const setupGmailMock = ({
|
||||
inbox,
|
||||
labels = DEFAULT_LABELS,
|
||||
handle = 'me@example.com',
|
||||
}: {
|
||||
inbox: gmail_v1.Schema$Message[];
|
||||
labels?: gmail_v1.Schema$Label[];
|
||||
handle?: string;
|
||||
}): { labels: GmailLabelStore; use: (...handlers: unknown[]) => void } => {
|
||||
const labelStore = createGmailLabelStore(labels);
|
||||
|
||||
const httpMock = setupHttpMock(
|
||||
...gmailHandlers({ inbox, labelStore, handle }),
|
||||
);
|
||||
|
||||
return { labels: labelStore, use: httpMock.use };
|
||||
};
|
||||
@@ -0,0 +1,104 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import {
|
||||
type MailFolder,
|
||||
type Message,
|
||||
} from '@microsoft/microsoft-graph-types';
|
||||
import { http, HttpResponse, type RequestHandler } from 'msw';
|
||||
|
||||
import { setupHttpMock } from 'test/integration/utils/http-mock';
|
||||
|
||||
export const microsoftMessage = (overrides: Partial<Message> = {}): Message => {
|
||||
const id = overrides.id ?? `ms-msg-${randomUUID()}`;
|
||||
|
||||
return {
|
||||
id,
|
||||
subject: `Subject ${id}`,
|
||||
internetMessageId: `<${id}@example.com>`,
|
||||
receivedDateTime: '2023-11-15T00:00:00Z',
|
||||
from: { emailAddress: { address: 'sender@example.com' } },
|
||||
toRecipients: [{ emailAddress: { address: 'recipient@example.com' } }],
|
||||
ccRecipients: [],
|
||||
bccRecipients: [],
|
||||
body: { contentType: 'text', content: `body ${id}` },
|
||||
...overrides,
|
||||
};
|
||||
};
|
||||
|
||||
const DEFAULT_FOLDERS: MailFolder[] = [
|
||||
{ id: 'inbox', displayName: 'Inbox' },
|
||||
{ id: 'sentitems', displayName: 'Sent Items' },
|
||||
];
|
||||
|
||||
export type MicrosoftFolderStore = {
|
||||
add: (folder: MailFolder) => void;
|
||||
remove: (folderId: string) => void;
|
||||
reset: () => void;
|
||||
list: () => MailFolder[];
|
||||
};
|
||||
|
||||
const createMicrosoftFolderStore = (
|
||||
initialFolders: MailFolder[],
|
||||
): MicrosoftFolderStore => {
|
||||
let folders = [...initialFolders];
|
||||
|
||||
return {
|
||||
add: (folder) => {
|
||||
folders = [...folders, folder];
|
||||
},
|
||||
remove: (folderId) => {
|
||||
folders = folders.filter((folder) => folder.id !== folderId);
|
||||
},
|
||||
reset: () => {
|
||||
folders = [...initialFolders];
|
||||
},
|
||||
list: () => folders,
|
||||
};
|
||||
};
|
||||
|
||||
const microsoftHandlers = ({
|
||||
inbox,
|
||||
folderStore,
|
||||
}: {
|
||||
inbox: Message[];
|
||||
folderStore: MicrosoftFolderStore;
|
||||
}): RequestHandler[] => [
|
||||
http.get('*/me/mailFolders', () =>
|
||||
HttpResponse.json<{ value: MailFolder[] }>({ value: folderStore.list() }),
|
||||
),
|
||||
http.get('*/messages/delta', () =>
|
||||
HttpResponse.json<{ value: Message[]; '@odata.deltaLink': string }>({
|
||||
value: inbox.map((message) => ({ id: message.id })),
|
||||
'@odata.deltaLink':
|
||||
'https://graph.microsoft.com/beta/me/mailfolders/inbox/messages/delta?$deltatoken=mock-delta-token',
|
||||
}),
|
||||
),
|
||||
http.post('*/$batch', () =>
|
||||
HttpResponse.json<{
|
||||
responses: { id: string; status: number; body: Message }[];
|
||||
}>({
|
||||
responses: inbox.map((message, index) => ({
|
||||
id: (index + 1).toString(),
|
||||
status: 200,
|
||||
body: message,
|
||||
})),
|
||||
}),
|
||||
),
|
||||
];
|
||||
|
||||
export const setupMicrosoftMock = ({
|
||||
inbox,
|
||||
folders = DEFAULT_FOLDERS,
|
||||
}: {
|
||||
inbox: Message[];
|
||||
folders?: MailFolder[];
|
||||
}): {
|
||||
folders: MicrosoftFolderStore;
|
||||
use: (...handlers: unknown[]) => void;
|
||||
} => {
|
||||
const folderStore = createMicrosoftFolderStore(folders);
|
||||
|
||||
const httpMock = setupHttpMock(...microsoftHandlers({ inbox, folderStore }));
|
||||
|
||||
return { folders: folderStore, use: httpMock.use };
|
||||
};
|
||||
@@ -0,0 +1,108 @@
|
||||
// TODO: cleanup see if we can reuse fragments from frontend
|
||||
import gql from 'graphql-tag';
|
||||
|
||||
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
|
||||
|
||||
export type MessageFolderDto = {
|
||||
id: string;
|
||||
name: string | null;
|
||||
isSynced: boolean;
|
||||
};
|
||||
|
||||
export type MessageChannelDto = {
|
||||
id: string;
|
||||
connectedAccountId: string;
|
||||
syncStatus: string;
|
||||
syncStage: string;
|
||||
syncStageStartedAt: string | null;
|
||||
throttleFailureCount: number;
|
||||
throttleRetryAfter: string | null;
|
||||
};
|
||||
|
||||
const MESSAGE_CHANNEL_FIELDS = gql`
|
||||
fragment TestMessageChannelFields on MessageChannel {
|
||||
id
|
||||
connectedAccountId
|
||||
syncStatus
|
||||
syncStage
|
||||
syncStageStartedAt
|
||||
throttleFailureCount
|
||||
throttleRetryAfter
|
||||
}
|
||||
`;
|
||||
|
||||
export const queryMessageChannels = async (): Promise<MessageChannelDto[]> => {
|
||||
const response = await makeMetadataAPIRequest({
|
||||
query: gql`
|
||||
query MessageChannelsForTest {
|
||||
myMessageChannels {
|
||||
...TestMessageChannelFields
|
||||
}
|
||||
}
|
||||
${MESSAGE_CHANNEL_FIELDS}
|
||||
`,
|
||||
});
|
||||
|
||||
return response.body.data.myMessageChannels;
|
||||
};
|
||||
|
||||
export const deleteConnectedAccount = async (
|
||||
connectedAccountId: string,
|
||||
): Promise<void> => {
|
||||
await makeMetadataAPIRequest({
|
||||
query: gql`
|
||||
mutation DeleteConnectedAccountForTest($id: UUID!) {
|
||||
deleteConnectedAccount(id: $id) {
|
||||
id
|
||||
}
|
||||
}
|
||||
`,
|
||||
variables: { id: connectedAccountId },
|
||||
});
|
||||
};
|
||||
|
||||
export const queryMessageFolders = async (
|
||||
messageChannelId: string,
|
||||
): Promise<MessageFolderDto[]> => {
|
||||
const response = await makeMetadataAPIRequest({
|
||||
query: gql`
|
||||
query MessageFoldersForTest($messageChannelId: UUID) {
|
||||
myMessageFolders(messageChannelId: $messageChannelId) {
|
||||
id
|
||||
name
|
||||
isSynced
|
||||
}
|
||||
}
|
||||
`,
|
||||
variables: { messageChannelId },
|
||||
});
|
||||
|
||||
return response.body.data.myMessageFolders;
|
||||
};
|
||||
|
||||
export const queryMessageChannel = async (
|
||||
connectedAccountId: string,
|
||||
messageChannelId: string,
|
||||
): Promise<MessageChannelDto> => {
|
||||
const response = await makeMetadataAPIRequest({
|
||||
query: gql`
|
||||
query MessageChannelForTest($connectedAccountId: UUID) {
|
||||
myMessageChannels(connectedAccountId: $connectedAccountId) {
|
||||
...TestMessageChannelFields
|
||||
}
|
||||
}
|
||||
${MESSAGE_CHANNEL_FIELDS}
|
||||
`,
|
||||
variables: { connectedAccountId },
|
||||
});
|
||||
|
||||
const channel = response.body.data.myMessageChannels.find(
|
||||
(candidate: MessageChannelDto) => candidate.id === messageChannelId,
|
||||
);
|
||||
|
||||
if (!channel) {
|
||||
throw new Error(`Message channel ${messageChannelId} not found`);
|
||||
}
|
||||
|
||||
return channel;
|
||||
};
|
||||
@@ -0,0 +1,107 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import {
|
||||
ConnectedAccountProvider,
|
||||
MessageChannelPendingGroupEmailsAction,
|
||||
MessageChannelSyncStage,
|
||||
MessageChannelSyncStatus,
|
||||
MessageChannelType,
|
||||
MessageChannelVisibility,
|
||||
MessageFolderImportPolicy,
|
||||
} from 'twenty-shared/types';
|
||||
|
||||
import { ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity';
|
||||
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
|
||||
import { CONNECTED_ACCOUNT_DATA_SEED_IDS } from 'src/engine/workspace-manager/dev-seeder/data/constants/connected-account-data-seeds.constant';
|
||||
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
|
||||
import { mintEncryptedToken } from 'test/integration/utils/mint-encrypted-token.util';
|
||||
|
||||
type SeedMessageChannelResult = {
|
||||
channelId: string;
|
||||
connectedAccountId: string;
|
||||
handle: string;
|
||||
cleanup: () => Promise<void>;
|
||||
};
|
||||
|
||||
// Reuses a dev-seeded connected account, sets fresh encrypted tokens + provider on it, and
|
||||
// creates a brand new association-free channel scheduled for list-fetch. Provider-agnostic:
|
||||
// the channel/account shape is identical for Gmail and Microsoft. A unique channelId keeps
|
||||
// each run isolated without a DB reset.
|
||||
export const seedMessageChannel = async ({
|
||||
workspaceId,
|
||||
provider = ConnectedAccountProvider.GOOGLE,
|
||||
handle = 'tim@apple.dev',
|
||||
connectedAccountId = CONNECTED_ACCOUNT_DATA_SEED_IDS.JANE,
|
||||
messageFolderImportPolicy = MessageFolderImportPolicy.ALL_FOLDERS,
|
||||
syncStatus = MessageChannelSyncStatus.NOT_SYNCED,
|
||||
syncStage = MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED,
|
||||
syncCursor = null,
|
||||
syncStageStartedAt = null,
|
||||
throttleRetryAfter = null,
|
||||
throttleFailureCount = 0,
|
||||
lastCredentialsRefreshedAt = new Date(),
|
||||
refreshTokenPlaintext = 'mock-refresh-token',
|
||||
}: {
|
||||
workspaceId: string;
|
||||
provider?: ConnectedAccountProvider;
|
||||
handle?: string;
|
||||
connectedAccountId?: string;
|
||||
messageFolderImportPolicy?: MessageFolderImportPolicy;
|
||||
syncStatus?: MessageChannelSyncStatus;
|
||||
syncStage?: MessageChannelSyncStage;
|
||||
syncCursor?: string | null;
|
||||
syncStageStartedAt?: Date | null;
|
||||
throttleRetryAfter?: Date | null;
|
||||
throttleFailureCount?: number;
|
||||
lastCredentialsRefreshedAt?: Date | null;
|
||||
refreshTokenPlaintext?: string;
|
||||
}): Promise<SeedMessageChannelResult> => {
|
||||
const connectedAccountRepository = getCoreRepository(ConnectedAccountEntity);
|
||||
const messageChannelRepository = getCoreRepository(MessageChannelEntity);
|
||||
|
||||
const channelId = randomUUID();
|
||||
|
||||
await connectedAccountRepository.update(
|
||||
{ id: connectedAccountId },
|
||||
{
|
||||
provider,
|
||||
accessToken: mintEncryptedToken('mock-access-token', workspaceId),
|
||||
refreshToken: mintEncryptedToken(refreshTokenPlaintext, workspaceId),
|
||||
lastCredentialsRefreshedAt,
|
||||
authFailedAt: null,
|
||||
},
|
||||
);
|
||||
|
||||
await messageChannelRepository.save({
|
||||
id: channelId,
|
||||
visibility: MessageChannelVisibility.SHARE_EVERYTHING,
|
||||
handle,
|
||||
type: MessageChannelType.EMAIL,
|
||||
messageFolderImportPolicy,
|
||||
pendingGroupEmailsAction: MessageChannelPendingGroupEmailsAction.NONE,
|
||||
isSyncEnabled: true,
|
||||
syncCursor,
|
||||
syncStatus,
|
||||
syncStage,
|
||||
syncStageStartedAt,
|
||||
throttleRetryAfter,
|
||||
throttleFailureCount,
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const cleanup = async () => {
|
||||
await messageChannelRepository.delete({ id: channelId });
|
||||
await connectedAccountRepository.update(
|
||||
{ id: connectedAccountId },
|
||||
{
|
||||
accessToken: null,
|
||||
refreshToken: null,
|
||||
lastCredentialsRefreshedAt: null,
|
||||
authFailedAt: null,
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
return { channelId, connectedAccountId, handle, cleanup };
|
||||
};
|
||||
@@ -18,6 +18,7 @@ import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handl
|
||||
import { ExceptionHandlerMockService } from 'src/engine/core-modules/exception-handler/mocks/exception-handler-mock.service';
|
||||
import { MockedUnhandledExceptionFilter } from 'src/engine/core-modules/exception-handler/mocks/mock-unhandled-exception.filter';
|
||||
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
|
||||
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces/message-queue-module-options.interface';
|
||||
import { JobsModule } from 'src/engine/core-modules/message-queue/jobs.module';
|
||||
import { QUEUE_DRIVER } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
|
||||
@@ -66,9 +67,15 @@ export const createApp = async (
|
||||
getCurrentDriver: () => ({
|
||||
validate: async () => ({ success: true }),
|
||||
}),
|
||||
})
|
||||
.overrideProvider(QUEUE_DRIVER)
|
||||
.useValue(syncDriver);
|
||||
});
|
||||
|
||||
// Default to the in-band SyncDriver; opt into the real Redis-backed BullMQ driver (and the
|
||||
// in-process workers the explorer spins up) with MESSAGE_QUEUE_TYPE=bull-mq.
|
||||
if (process.env.MESSAGE_QUEUE_TYPE !== MessageQueueDriverType.BullMQ) {
|
||||
moduleBuilder = moduleBuilder
|
||||
.overrideProvider(QUEUE_DRIVER)
|
||||
.useValue(syncDriver);
|
||||
}
|
||||
|
||||
if (config.moduleBuilderHook) {
|
||||
moduleBuilder = config.moduleBuilderHook(moduleBuilder);
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { type MessageQueueJobData } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
|
||||
|
||||
export const enqueueJobAndAwait = async <
|
||||
TData extends MessageQueueJobData,
|
||||
TResult,
|
||||
>(
|
||||
queue: MessageQueue,
|
||||
job: { name: string },
|
||||
data: TData,
|
||||
): Promise<TResult> => {
|
||||
const messageQueueService = global.app.get<MessageQueueService>(
|
||||
getQueueToken(queue),
|
||||
{ strict: false },
|
||||
);
|
||||
|
||||
return (await messageQueueService.add<TData, TResult>(
|
||||
job.name,
|
||||
data,
|
||||
)) as TResult;
|
||||
};
|
||||
@@ -0,0 +1,11 @@
|
||||
import { type EntityClassOrSchema } from '@nestjs/typeorm/dist/interfaces/entity-class-or-schema.type';
|
||||
import { getRepositoryToken } from '@nestjs/typeorm';
|
||||
|
||||
import { type ObjectLiteral, type Repository } from 'typeorm';
|
||||
|
||||
export const getCoreRepository = <Entity extends ObjectLiteral>(
|
||||
target: EntityClassOrSchema,
|
||||
): Repository<Entity> =>
|
||||
global.app.get<Repository<Entity>>(getRepositoryToken(target), {
|
||||
strict: false,
|
||||
});
|
||||
@@ -0,0 +1,50 @@
|
||||
import { http, passthrough, type RequestHandler } from 'msw';
|
||||
import { setupServer } from 'msw/node';
|
||||
|
||||
// MSW patches the whole process, so it also sees supertest's inbound calls to our own app.
|
||||
// Let those through. Keep the matchers colon-free: `http://localhost:*` makes path-to-regexp
|
||||
// parse `:` as a param and crashes beforeAll (msw#2202).
|
||||
const localhostPassthroughHandlers = [
|
||||
http.all('http://127.0.0.1*', () => passthrough()),
|
||||
http.all('http://localhost*', () => passthrough()),
|
||||
];
|
||||
|
||||
// One setupServer per test process — multiple instances re-patch node's request modules
|
||||
// (msw#821). resetHandlers() restores exactly these initial handlers.
|
||||
const server = setupServer(...localhostPassthroughHandlers);
|
||||
|
||||
export type HttpMock = {
|
||||
use: (...handlers: unknown[]) => void;
|
||||
};
|
||||
|
||||
// Opt-in: a suite calls this at describe scope to wire MSW's lifecycle (listen/reset/close)
|
||||
// onto the shared server with `onUnhandledRequest: 'error'`, so any un-mocked outbound call
|
||||
// fails loudly instead of hitting the real network. `baseHandlers` are re-applied per test as
|
||||
// the suite's happy-path; per-test variation goes through the returned `use`.
|
||||
// Handlers are typed loosely because msw's RequestHandler isn't portably nameable in an
|
||||
// exported signature under our tsconfig; callers always pass RequestHandler values.
|
||||
export const setupHttpMock = (...baseHandlers: unknown[]): HttpMock => {
|
||||
const applyBaseHandlers = () => {
|
||||
if (baseHandlers.length > 0) {
|
||||
server.use(...(baseHandlers as RequestHandler[]));
|
||||
}
|
||||
};
|
||||
|
||||
// Apply the base handlers in beforeAll as well as beforeEach: suites that connect an account
|
||||
// in their own beforeAll need the handlers active before the first test starts.
|
||||
beforeAll(() => {
|
||||
server.listen({ onUnhandledRequest: 'error' });
|
||||
applyBaseHandlers();
|
||||
});
|
||||
|
||||
beforeEach(applyBaseHandlers);
|
||||
|
||||
afterEach(() => server.resetHandlers());
|
||||
|
||||
afterAll(() => server.close());
|
||||
|
||||
return {
|
||||
use: (...handlers: unknown[]) =>
|
||||
server.use(...(handlers as RequestHandler[])),
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,17 @@
|
||||
import { type EncryptedString } from 'src/engine/core-modules/secret-encryption/branded-strings/encrypted-string.type';
|
||||
import { type PlaintextString } from 'src/engine/core-modules/secret-encryption/branded-strings/plaintext-string.type';
|
||||
import { SecretEncryptionService } from 'src/engine/core-modules/secret-encryption/secret-encryption.service';
|
||||
import { type EnvironmentConfigDriver } from 'src/engine/core-modules/twenty-config/drivers/environment-config.driver';
|
||||
|
||||
// SecretEncryptionService is a class-token provider, unreachable from a spec via global.app
|
||||
// (module-realm split). It only needs `get(key)`, so build it by hand off process.env —
|
||||
// same APP_SECRET the app uses, so the pipeline decrypts the result for free.
|
||||
export const mintEncryptedToken = (
|
||||
plaintext: string,
|
||||
workspaceId: string,
|
||||
): EncryptedString =>
|
||||
new SecretEncryptionService({
|
||||
get: (key: string) => process.env[key],
|
||||
} as EnvironmentConfigDriver).encryptVersioned(plaintext as PlaintextString, {
|
||||
workspaceId,
|
||||
});
|
||||
@@ -0,0 +1,24 @@
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
// Polls `read` until `predicate` passes or the timeout elapses, then returns the last value
|
||||
// (so the caller's assertion produces a meaningful diff on timeout). Used to await the
|
||||
// eventual outcome of jobs running on the real BullMQ worker.
|
||||
export const pollUntil = async <TValue>(
|
||||
read: () => Promise<TValue>,
|
||||
predicate: (value: TValue) => boolean,
|
||||
{
|
||||
timeoutMs = 30_000,
|
||||
intervalMs = 250,
|
||||
}: { timeoutMs?: number; intervalMs?: number } = {},
|
||||
): Promise<TValue> => {
|
||||
const startedAt = Date.now();
|
||||
|
||||
let value = await read();
|
||||
|
||||
while (!predicate(value) && Date.now() - startedAt < timeoutMs) {
|
||||
await sleep(intervalMs);
|
||||
value = await read();
|
||||
}
|
||||
|
||||
return value;
|
||||
};
|
||||
@@ -1,3 +1,4 @@
|
||||
import nodeFetch from 'node-fetch';
|
||||
import { type JestConfigWithTsJest } from 'ts-jest';
|
||||
import 'tsconfig-paths/register';
|
||||
|
||||
@@ -6,6 +7,11 @@ import { rawDataSource } from 'src/database/typeorm/raw/raw.datasource';
|
||||
import { createApp } from './create-app';
|
||||
|
||||
export default async (_: unknown, projectConfig: JestConfigWithTsJest) => {
|
||||
// Route the app's global fetch through node-fetch (node:http) so MSW — which patches the
|
||||
// shared node:http builtin — intercepts clients that bottom out at global fetch (e.g. the
|
||||
// Microsoft Graph client). Native undici fetch is realm-local and escapes MSW.
|
||||
globalThis.fetch = nodeFetch as unknown as typeof globalThis.fetch;
|
||||
|
||||
const app = await createApp({});
|
||||
|
||||
if (!projectConfig.globals) {
|
||||
|
||||
Reference in New Issue
Block a user