This commit is contained in:
neo773
2026-06-05 21:39:54 +05:30
parent 0a4dd19472
commit a73020eb45
16 changed files with 459 additions and 110 deletions
+8 -5
View File
@@ -14,13 +14,13 @@ IS_WORKSPACE_CREATION_LIMITED_TO_SERVER_ADMINS=false
ENTERPRISE_VALIDITY_TOKEN=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkZXYtc3Vic2NyaXB0aW9uLWlkIiwic3RhdHVzIjoidmFsaWQiLCJpYXQiOjE3NzMzMDg4MzMsImV4cCI6NDg5NzUxMTIzM30.qhfrW_SV2Y86fWtWXsALlAVhxmMxylUUIefN0fki10Q2NTGGqFVXZrNn2WacJY37yq3m5y4WgwZw34ua6E0ff_YUXsrlY5OHJWHT9DMqKCRn-JujHJnnYp3VHLncy5CvxH5r9mfPFp-5AWe1pYeR1T63sTiejH3sfDrNE357SB7KVti8LCcnsJxEtXB2tRnvyvdun7A-GKoKYEIam-16ZRKKFs6GaWo8ObHdfm8yBt6uK4DZSGPWb644QyWh9FtDxbzJ0ti54DuHSlErLgIp1NNEsMA0MK7zFY7StRaOdt72rxE1ZHwN7e6HhweTU4ORVUPfYkjDFLB2fF7Pa7Kvdg
AUTH_GOOGLE_ENABLED=false
MESSAGING_PROVIDER_GMAIL_ENABLED=false
AUTH_GOOGLE_ENABLED=true
MESSAGING_PROVIDER_GMAIL_ENABLED=true
IS_IMAP_SMTP_CALDAV_ENABLED=true
IS_IMAP_SMTP_CALDAV_CONNECTION_TEST_ENABLED=false
CALENDAR_PROVIDER_GOOGLE_ENABLED=false
MESSAGING_PROVIDER_MICROSOFT_ENABLED=false
CALENDAR_PROVIDER_MICROSOFT_ENABLED=false
CALENDAR_PROVIDER_GOOGLE_ENABLED=true
MESSAGING_PROVIDER_MICROSOFT_ENABLED=true
CALENDAR_PROVIDER_MICROSOFT_ENABLED=true
AUTH_GOOGLE_CALLBACK_URL=http://localhost:3000/auth/google/redirect
AUTH_GOOGLE_APIS_CALLBACK_URL=http://localhost:3000/auth/google-apis/get-access-token
@@ -34,3 +34,6 @@ IS_WORKSPACE_CREATION_V2_ENABLED=true
SHOULD_SEED_STANDARD_RECORD_PAGE_LAYOUTS=true
LOGIC_FUNCTION_TYPE=LOCAL
IS_CONFIG_VARIABLES_IN_DB_ENABLED=false
AUTH_GOOGLE_CLIENT_ID=mock-google-client-id
AUTH_GOOGLE_CLIENT_SECRET=mock-google-client-secret
@@ -5,7 +5,6 @@ import {
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageChannelEntity } from 'src/engine/metadata-modules/message-channel/entities/message-channel.entity';
import { MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
@@ -16,6 +15,8 @@ import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -29,9 +30,7 @@ const runListFetch = (channelId: string) =>
});
const getSyncStateByFolderName = async (channelId: string) => {
const folders = await getCoreRepository(MessageFolderEntity).find({
where: { messageChannelId: channelId },
});
const folders = await queryMessageFolders(channelId);
return Object.fromEntries(
folders.map((folder) => [folder.name, folder.isSynced]),
@@ -51,6 +50,7 @@ describe('Gmail folder discovery (integration)', () => {
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
beforeAll(async () => {
jest.useRealTimers();
channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
messageFolderImportPolicy: MessageFolderImportPolicy.SELECTED_FOLDERS,
@@ -64,11 +64,12 @@ describe('Gmail folder discovery (integration)', () => {
it('rediscovers a folder appended after the first sync and leaves it unsynced under the selected-folders policy', async () => {
await runListFetch(channel.channelId);
expect(await getSyncStateByFolderName(channel.channelId)).toEqual({
INBOX: false,
SENT: false,
Work: false,
});
const afterFirstSync = await pollUntil(
() => getSyncStateByFolderName(channel.channelId),
(state) => Object.keys(state).length === 3,
{ timeoutMs: 30_000 },
);
expect(afterFirstSync).toEqual({ INBOX: false, SENT: false, Work: false });
gmail.labels.add({ id: 'Label_Archive', name: 'Archive' });
@@ -79,7 +80,12 @@ describe('Gmail folder discovery (integration)', () => {
await runListFetch(channel.channelId);
expect(await getSyncStateByFolderName(channel.channelId)).toEqual({
const afterSecondSync = await pollUntil(
() => getSyncStateByFolderName(channel.channelId),
(state) => Object.keys(state).length === 4,
{ timeoutMs: 30_000 },
);
expect(afterSecondSync).toEqual({
INBOX: false,
SENT: false,
Work: false,
@@ -1,11 +1,7 @@
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
import { findManyOperationFactory } from 'test/integration/graphql/utils/find-many-operation-factory.util';
import { makeGraphqlAPIRequest } from 'test/integration/graphql/utils/make-graphql-api-request.util';
import {
@@ -13,67 +9,67 @@ import {
gmailMessage,
setupGmailMock,
} from 'test/integration/messaging/utils/gmail-mock.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { connectMessagingAccount } from 'test/integration/messaging/utils/connect-messaging-account.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const findImportedSubjects = async (subjects: string[]): Promise<string[]> => {
const response = await makeGraphqlAPIRequest(
findManyOperationFactory({
objectMetadataSingularName: 'message',
objectMetadataPluralName: 'messages',
gqlFields: `id
subject`,
filter: { subject: { in: subjects } },
}),
);
return response.body.data.messages.edges
.map((edge: { node: { subject: string } }) => edge.node.subject)
.sort();
};
describe('Gmail message list fetch job (integration)', () => {
const inbox = [gmailMessage(), gmailMessage()];
setupGmailMock({ inbox, handle: 'tim@apple.dev' });
setupGmailMock({ inbox, handle: 'connected-account@apple.dev' });
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
let channel: Awaited<ReturnType<typeof connectMessagingAccount>>;
beforeAll(async () => {
channel = await seedMessageChannel({ workspaceId: WORKSPACE_ID });
// The real BullMQ worker runs asynchronously, so polling uses real timers.
jest.useRealTimers();
channel = await connectMessagingAccount(ConnectedAccountProvider.GOOGLE);
}, 60000);
afterAll(async () => {
await channel.cleanup();
});
it('runs the full sync pipeline: folders synced, list fetched, messages imported', async () => {
const result = await enqueueJobAndAwait<
MessagingMessageListFetchJobData,
MessagingMessageListFetchJobResult
>(MessageQueue.messagingQueue, MessagingMessageListFetchJob, {
messageChannelId: channel.channelId,
workspaceId: WORKSPACE_ID,
});
const folders = await getCoreRepository<MessageFolderEntity>(
MessageFolderEntity,
).find({
where: { messageChannelId: channel.channelId },
});
const folderNames = folders.map((folder) => folder.name).sort();
expect(folderNames).toEqual(['INBOX', 'SENT']);
expect(result).toEqual({
messagesToImport: inbox.length,
messagesToDelete: 0,
});
it('runs the full sync pipeline on the real worker: folders synced, messages imported', async () => {
// The cron schedules pending channels and enqueues their list-fetch onto the worker.
await enqueueJobAndAwait(
MessageQueue.cronQueue,
MessagingMessageListFetchCronJob,
{},
);
const expectedSubjects = inbox.map(getGmailMessageSubject);
const messagesResponse = await makeGraphqlAPIRequest(
findManyOperationFactory({
objectMetadataSingularName: 'message',
objectMetadataPluralName: 'messages',
gqlFields: `id
subject`,
filter: { subject: { in: expectedSubjects } },
}),
const importedSubjects = await pollUntil(
() => findImportedSubjects(expectedSubjects),
(subjects) => subjects.length === expectedSubjects.length,
{ timeoutMs: 30_000 },
);
const importedSubjects = messagesResponse.body.data.messages.edges
.map((edge: { node: { subject: string } }) => edge.node.subject)
.sort();
expect(importedSubjects).toEqual([...expectedSubjects].sort());
const folders = await queryMessageFolders(channel.channelId);
expect(folders.map((folder) => folder.name).sort()).toEqual([
'INBOX',
'SENT',
]);
}, 60000);
});
@@ -6,9 +6,10 @@ import {
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import { MessagingRelaunchFailedMessageChannelJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job';
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -20,6 +21,10 @@ const relaunchFailedChannel = (channelId: string) =>
);
describe('Messaging failed-channel recovery (integration)', () => {
beforeAll(() => {
jest.useRealTimers();
});
it('recovers a FAILED_UNKNOWN channel and clears its throttle state', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
@@ -33,7 +38,13 @@ describe('Messaging failed-channel recovery (integration)', () => {
try {
await relaunchFailedChannel(channel.channelId);
const recovered = await getMessageChannel(channel.channelId);
const recovered = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState?.syncStatus === MessageChannelSyncStatus.ACTIVE,
{ timeoutMs: 30_000 },
);
expect(recovered.syncStage).toBe(
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
@@ -57,7 +68,13 @@ describe('Messaging failed-channel recovery (integration)', () => {
try {
await relaunchFailedChannel(channel.channelId);
const channelAfter = await getMessageChannel(channel.channelId);
// No-op recovery: grace window for the worker to process, then assert no change.
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
() => false,
{ timeoutMs: 3_000 },
);
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
expect(channelAfter.syncStatus).toBe(
@@ -9,8 +9,9 @@ import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { setupGmailMock } from 'test/integration/messaging/utils/gmail-mock.util';
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -45,6 +46,10 @@ const runListFetch = (channelId: string) =>
describe('Messaging rate-limit throttling (integration)', () => {
const gmail = setupGmailMock({ inbox: [], handle: 'tim@apple.dev' });
beforeAll(() => {
jest.useRealTimers();
});
it('records the throttle backoff window on a 429 and keeps the channel active', async () => {
const channel = await seedMessageChannel({ workspaceId: WORKSPACE_ID });
@@ -53,11 +58,14 @@ describe('Messaging rate-limit throttling (integration)', () => {
await runListFetch(channel.channelId);
const channelAfter = await getMessageChannel(channel.channelId);
expect(channelAfter.throttleFailureCount).toBe(1);
expect(channelAfter.throttleRetryAfter?.toISOString()).toBe(
RETRY_AFTER_ISO,
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) => channelState.throttleRetryAfter !== null,
{ timeoutMs: 30_000 },
);
expect(channelAfter.throttleFailureCount).toBe(1);
expect(channelAfter.throttleRetryAfter).toBe(RETRY_AFTER_ISO);
expect(channelAfter.syncStatus).not.toBe(
MessageChannelSyncStatus.FAILED_UNKNOWN,
);
@@ -77,7 +85,13 @@ describe('Messaging rate-limit throttling (integration)', () => {
await runListFetch(channel.channelId);
const channelAfter = await getMessageChannel(channel.channelId);
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState.syncStatus === MessageChannelSyncStatus.FAILED_UNKNOWN,
{ timeoutMs: 30_000 },
);
expect(channelAfter.syncStatus).toBe(
MessageChannelSyncStatus.FAILED_UNKNOWN,
);
@@ -3,9 +3,10 @@ import { MessageChannelSyncStage } from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -13,6 +14,10 @@ const STALE_STARTED_AT = new Date(Date.now() - 31 * 60 * 1000);
const RECENT_STARTED_AT = new Date(Date.now() - 60 * 1000);
describe('Messaging stale-sync recovery (integration)', () => {
beforeAll(() => {
jest.useRealTimers();
});
it('resets stale ongoing channels to pending and leaves recent ones running', async () => {
const staleListFetch = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
@@ -37,21 +42,33 @@ describe('Messaging stale-sync recovery (integration)', () => {
{ workspaceId: WORKSPACE_ID },
);
const staleListFetchAfter = await getMessageChannel(
staleListFetch.channelId,
const staleListFetchAfter = await pollUntil(
() =>
queryMessageChannel(
staleListFetch.connectedAccountId,
staleListFetch.channelId,
),
(channelState) =>
channelState.syncStage ===
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
{ timeoutMs: 30_000 },
);
expect(staleListFetchAfter.syncStage).toBe(
MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
);
expect(staleListFetchAfter.syncStageStartedAt).toBeNull();
const staleImportAfter = await getMessageChannel(staleImport.channelId);
const staleImportAfter = await queryMessageChannel(
staleImport.connectedAccountId,
staleImport.channelId,
);
expect(staleImportAfter.syncStage).toBe(
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
);
expect(staleImportAfter.syncStageStartedAt).toBeNull();
const recentListFetchAfter = await getMessageChannel(
const recentListFetchAfter = await queryMessageChannel(
recentListFetch.connectedAccountId,
recentListFetch.channelId,
);
expect(recentListFetchAfter.syncStage).toBe(
@@ -12,6 +12,7 @@ import {
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
const HISTORY_ID = '987654321';
@@ -36,6 +37,20 @@ describe('Messaging sync cursor (integration)', () => {
handle: 'tim@apple.dev',
});
beforeAll(() => {
jest.useRealTimers();
});
const runAndAwaitCursor = async (channelId: string) => {
await runListFetch(channelId);
return pollUntil(
() => getMessageChannel(channelId),
(channel) => channel.syncCursor === HISTORY_ID,
{ timeoutMs: 30_000 },
);
};
it('runs a full sync and seeds the cursor when the cursor is null', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
@@ -43,11 +58,7 @@ describe('Messaging sync cursor (integration)', () => {
});
try {
const result = await runListFetch(channel.channelId);
expect(result.messagesToImport).toBe(2);
const channelAfter = await getMessageChannel(channel.channelId);
const channelAfter = await runAndAwaitCursor(channel.channelId);
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
} finally {
await channel.cleanup();
@@ -61,11 +72,7 @@ describe('Messaging sync cursor (integration)', () => {
});
try {
const result = await runListFetch(channel.channelId);
expect(result.messagesToImport).toBe(2);
const channelAfter = await getMessageChannel(channel.channelId);
const channelAfter = await runAndAwaitCursor(channel.channelId);
expect(channelAfter.syncCursor).toBe(HISTORY_ID);
} finally {
await channel.cleanup();
@@ -16,9 +16,10 @@ import {
INVALID_REFRESH_TOKEN_PREFIX,
setupGmailMock,
} from 'test/integration/messaging/utils/gmail-mock.util';
import { getMessageChannel } from 'test/integration/messaging/utils/get-message-channel.util';
import { queryMessageChannel } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -44,6 +45,10 @@ const getConnectedAccount = (connectedAccountId: string) =>
describe('Messaging token refresh (integration)', () => {
setupGmailMock({ inbox: [gmailMessage()], handle: 'tim@apple.dev' });
beforeAll(() => {
jest.useRealTimers();
});
it('refreshes an expired access token and completes the sync', async () => {
const channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
@@ -51,13 +56,16 @@ describe('Messaging token refresh (integration)', () => {
});
try {
const result = await runListFetch(channel.channelId);
await runListFetch(channel.channelId);
expect(result).toEqual({ messagesToImport: 1, messagesToDelete: 0 });
const connectedAccount = await getConnectedAccount(
channel.connectedAccountId,
const connectedAccount = await pollUntil(
() => getConnectedAccount(channel.connectedAccountId),
(account) =>
(account.lastCredentialsRefreshedAt?.getTime() ?? 0) >
EXPIRED_CREDENTIALS_AT.getTime(),
{ timeoutMs: 30_000 },
);
expect(connectedAccount.authFailedAt).toBeNull();
expect(
connectedAccount.lastCredentialsRefreshedAt?.getTime(),
@@ -77,7 +85,14 @@ describe('Messaging token refresh (integration)', () => {
try {
await runListFetch(channel.channelId);
const channelAfter = await getMessageChannel(channel.channelId);
const channelAfter = await pollUntil(
() =>
queryMessageChannel(channel.connectedAccountId, channel.channelId),
(channelState) =>
channelState.syncStatus ===
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
{ timeoutMs: 30_000 },
);
expect(channelAfter.syncStage).toBe(MessageChannelSyncStage.FAILED);
expect(channelAfter.syncStatus).toBe(
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
@@ -1,7 +1,6 @@
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity';
import { SEED_APPLE_WORKSPACE_ID } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant';
import {
type MessagingMessageListFetchJobData,
@@ -9,9 +8,10 @@ import {
type MessagingMessageListFetchJobResult,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { setupMicrosoftMock } from 'test/integration/messaging/utils/microsoft-mock.util';
import { queryMessageFolders } from 'test/integration/messaging/utils/query-messaging.util';
import { seedMessageChannel } from 'test/integration/messaging/utils/seed-message-channel.util';
import { enqueueJobAndAwait } from 'test/integration/utils/enqueue-job-and-await.util';
import { getCoreRepository } from 'test/integration/utils/get-core-repository.util';
import { pollUntil } from 'test/integration/utils/poll-until.util';
const WORKSPACE_ID = SEED_APPLE_WORKSPACE_ID;
@@ -20,7 +20,17 @@ describe('Microsoft folder discovery (integration)', () => {
let channel: Awaited<ReturnType<typeof seedMessageChannel>>;
const getFolderNames = async (channelId: string): Promise<string[]> => {
const folders = await queryMessageFolders(channelId);
return folders
.map((folder) => folder.name)
.filter((name): name is string => name !== null)
.sort();
};
beforeAll(async () => {
jest.useRealTimers();
channel = await seedMessageChannel({
workspaceId: WORKSPACE_ID,
provider: ConnectedAccountProvider.MICROSOFT,
@@ -40,13 +50,12 @@ describe('Microsoft folder discovery (integration)', () => {
workspaceId: WORKSPACE_ID,
});
const folders = await getCoreRepository(MessageFolderEntity).find({
where: { messageChannelId: channel.channelId },
});
const folderNames = await pollUntil(
() => getFolderNames(channel.channelId),
(names) => names.length === 2,
{ timeoutMs: 30_000 },
);
expect(folders.map((folder) => folder.name).sort()).toEqual([
'Inbox',
'Sent Items',
]);
expect(folderNames).toEqual(['Inbox', 'Sent Items']);
}, 60000);
});
@@ -0,0 +1,79 @@
import gql from 'graphql-tag';
import request from 'supertest';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
import {
deleteConnectedAccount,
queryMessageChannels,
} from 'test/integration/messaging/utils/query-messaging.util';
type ConnectMessagingAccountResult = {
channelId: string;
connectedAccountId: string;
cleanup: () => Promise<void>;
};
const OAUTH_CALLBACK_PATH: Partial<Record<ConnectedAccountProvider, string>> = {
[ConnectedAccountProvider.GOOGLE]: '/auth/google-apis/get-access-token',
[ConnectedAccountProvider.MICROSOFT]: '/auth/microsoft-apis/get-access-token',
};
// Connects a messaging account through the real OAuth callback the providers redirect to:
// mints a transient token for the authed user, drives the callback with a code (the HTTP
// boundary is mocked), and lets the provider's auth service create a fresh connected account
// and message channel. Returns the channel and a cleanup that removes the account through the
// same mutation the product uses.
export const connectMessagingAccount = async (
provider: ConnectedAccountProvider,
): Promise<ConnectMessagingAccountResult> => {
const callbackPath = OAUTH_CALLBACK_PATH[provider];
if (!callbackPath) {
throw new Error(`Unsupported OAuth provider: ${provider}`);
}
const channelIdsBeforeConnect = new Set(
(await queryMessageChannels()).map((channel) => channel.id),
);
const transientTokenResponse = await makeMetadataAPIRequest({
query: gql`
mutation GenerateTransientToken {
generateTransientToken {
transientToken {
token
}
}
}
`,
});
const state = JSON.stringify({
transientToken:
transientTokenResponse.body.data.generateTransientToken.transientToken
.token,
messageVisibility: 'SHARE_EVERYTHING',
skipMessageChannelConfiguration: true,
});
const callbackResponse = await request(`http://localhost:${APP_PORT}`)
.get(callbackPath)
.query({ code: 'mock-authorization-code', state });
const connectedChannel = (await queryMessageChannels()).find(
(channel) => !channelIdsBeforeConnect.has(channel.id),
);
if (!connectedChannel) {
throw new Error(
`OAuth connect for ${provider} created no message channel (callback redirected to ${callbackResponse.headers.location})`,
);
}
return {
channelId: connectedChannel.id,
connectedAccountId: connectedChannel.connectedAccountId,
cleanup: () => deleteConnectedAccount(connectedChannel.connectedAccountId),
};
};
@@ -7,6 +7,16 @@ import { setupHttpMock } from 'test/integration/utils/http-mock';
export const INVALID_REFRESH_TOKEN_PREFIX = 'invalid-refresh-token';
const GOOGLE_OAUTH_SCOPES = [
'email',
'profile',
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/calendar.events',
'https://www.googleapis.com/auth/profile.emails.read',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.compose',
].join(' ');
export const gmailMessage = (
overrides: Partial<gmail_v1.Schema$Message> = {},
): gmail_v1.Schema$Message => {
@@ -125,6 +135,36 @@ const gmailHandlers = ({
token_type: 'Bearer',
});
}),
// --- OAuth connect flow (passport code exchange + connect-path availability checks) ---
http.post('https://www.googleapis.com/oauth2/v4/token', () =>
HttpResponse.json({
access_token: 'mock-access-token',
refresh_token: 'mock-refresh-token',
expires_in: 3600,
scope: GOOGLE_OAUTH_SCOPES,
token_type: 'Bearer',
}),
),
http.get('https://www.googleapis.com/oauth2/v3/userinfo', () =>
HttpResponse.json({
sub: 'google-user-id',
email: handle,
email_verified: true,
name: 'Jane Austen',
given_name: 'Jane',
family_name: 'Austen',
}),
),
http.get('https://www.googleapis.com/oauth2/v3/tokeninfo', () =>
HttpResponse.json({ scope: GOOGLE_OAUTH_SCOPES, email: handle }),
),
http.get('https://gmail.googleapis.com/gmail/v1/users/me/profile', () =>
HttpResponse.json({ emailAddress: handle, messagesTotal: 0 }),
),
http.get(
'https://www.googleapis.com/calendar/v3/calendars/primary/events',
() => HttpResponse.json({ items: [] }),
),
http.get('*/gmail/v1/users/me/settings/sendAs', () => {
const sendAs: gmail_v1.Schema$SendAs[] = [
{ sendAsEmail: handle, isPrimary: true },
@@ -0,0 +1,108 @@
// TODO: cleanup see if we can reuse fragments from frontend
import gql from 'graphql-tag';
import { makeMetadataAPIRequest } from 'test/integration/metadata/suites/utils/make-metadata-api-request.util';
export type MessageFolderDto = {
id: string;
name: string | null;
isSynced: boolean;
};
export type MessageChannelDto = {
id: string;
connectedAccountId: string;
syncStatus: string;
syncStage: string;
syncStageStartedAt: string | null;
throttleFailureCount: number;
throttleRetryAfter: string | null;
};
const MESSAGE_CHANNEL_FIELDS = gql`
fragment TestMessageChannelFields on MessageChannel {
id
connectedAccountId
syncStatus
syncStage
syncStageStartedAt
throttleFailureCount
throttleRetryAfter
}
`;
export const queryMessageChannels = async (): Promise<MessageChannelDto[]> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageChannelsForTest {
myMessageChannels {
...TestMessageChannelFields
}
}
${MESSAGE_CHANNEL_FIELDS}
`,
});
return response.body.data.myMessageChannels;
};
export const deleteConnectedAccount = async (
connectedAccountId: string,
): Promise<void> => {
await makeMetadataAPIRequest({
query: gql`
mutation DeleteConnectedAccountForTest($id: UUID!) {
deleteConnectedAccount(id: $id) {
id
}
}
`,
variables: { id: connectedAccountId },
});
};
export const queryMessageFolders = async (
messageChannelId: string,
): Promise<MessageFolderDto[]> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageFoldersForTest($messageChannelId: UUID) {
myMessageFolders(messageChannelId: $messageChannelId) {
id
name
isSynced
}
}
`,
variables: { messageChannelId },
});
return response.body.data.myMessageFolders;
};
export const queryMessageChannel = async (
connectedAccountId: string,
messageChannelId: string,
): Promise<MessageChannelDto> => {
const response = await makeMetadataAPIRequest({
query: gql`
query MessageChannelForTest($connectedAccountId: UUID) {
myMessageChannels(connectedAccountId: $connectedAccountId) {
...TestMessageChannelFields
}
}
${MESSAGE_CHANNEL_FIELDS}
`,
variables: { connectedAccountId },
});
const channel = response.body.data.myMessageChannels.find(
(candidate: MessageChannelDto) => candidate.id === messageChannelId,
);
if (!channel) {
throw new Error(`Message channel ${messageChannelId} not found`);
}
return channel;
};
@@ -31,7 +31,7 @@ export const seedMessageChannel = async ({
workspaceId,
provider = ConnectedAccountProvider.GOOGLE,
handle = 'tim@apple.dev',
connectedAccountId = CONNECTED_ACCOUNT_DATA_SEED_IDS.TIM,
connectedAccountId = CONNECTED_ACCOUNT_DATA_SEED_IDS.JANE,
messageFolderImportPolicy = MessageFolderImportPolicy.ALL_FOLDERS,
syncStatus = MessageChannelSyncStatus.NOT_SYNCED,
syncStage = MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED,
@@ -18,6 +18,7 @@ import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handl
import { ExceptionHandlerMockService } from 'src/engine/core-modules/exception-handler/mocks/exception-handler-mock.service';
import { MockedUnhandledExceptionFilter } from 'src/engine/core-modules/exception-handler/mocks/mock-unhandled-exception.filter';
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces/message-queue-module-options.interface';
import { JobsModule } from 'src/engine/core-modules/message-queue/jobs.module';
import { QUEUE_DRIVER } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
@@ -66,9 +67,15 @@ export const createApp = async (
getCurrentDriver: () => ({
validate: async () => ({ success: true }),
}),
})
.overrideProvider(QUEUE_DRIVER)
.useValue(syncDriver);
});
// Default to the in-band SyncDriver; opt into the real Redis-backed BullMQ driver (and the
// in-process workers the explorer spins up) with MESSAGE_QUEUE_TYPE=bull-mq.
if (process.env.MESSAGE_QUEUE_TYPE !== MessageQueueDriverType.BullMQ) {
moduleBuilder = moduleBuilder
.overrideProvider(QUEUE_DRIVER)
.useValue(syncDriver);
}
if (config.moduleBuilderHook) {
moduleBuilder = config.moduleBuilderHook(moduleBuilder);
@@ -24,14 +24,21 @@ export type HttpMock = {
// Handlers are typed loosely because msw's RequestHandler isn't portably nameable in an
// exported signature under our tsconfig; callers always pass RequestHandler values.
export const setupHttpMock = (...baseHandlers: unknown[]): HttpMock => {
beforeAll(() => server.listen({ onUnhandledRequest: 'error' }));
beforeEach(() => {
const applyBaseHandlers = () => {
if (baseHandlers.length > 0) {
server.use(...(baseHandlers as RequestHandler[]));
}
};
// Apply the base handlers in beforeAll as well as beforeEach: suites that connect an account
// in their own beforeAll need the handlers active before the first test starts.
beforeAll(() => {
server.listen({ onUnhandledRequest: 'error' });
applyBaseHandlers();
});
beforeEach(applyBaseHandlers);
afterEach(() => server.resetHandlers());
afterAll(() => server.close());
@@ -0,0 +1,24 @@
import { setTimeout as sleep } from 'node:timers/promises';
// Polls `read` until `predicate` passes or the timeout elapses, then returns the last value
// (so the caller's assertion produces a meaningful diff on timeout). Used to await the
// eventual outcome of jobs running on the real BullMQ worker.
export const pollUntil = async <TValue>(
read: () => Promise<TValue>,
predicate: (value: TValue) => boolean,
{
timeoutMs = 30_000,
intervalMs = 250,
}: { timeoutMs?: number; intervalMs?: number } = {},
): Promise<TValue> => {
const startedAt = Date.now();
let value = await read();
while (!predicate(value) && Date.now() - startedAt < timeoutMs) {
await sleep(intervalMs);
value = await read();
}
return value;
};