Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a6d6d0091f | |||
| 502abe20c1 |
+4
-4
@@ -111,7 +111,7 @@ describe('GoogleAPIsService', () => {
|
||||
{
|
||||
provide: CalendarChannelSyncStatusService,
|
||||
useValue: {
|
||||
resetAndScheduleCalendarEventListFetch: jest.fn(),
|
||||
resetAndMarkAsCalendarEventListFetchPending: jest.fn(),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -127,7 +127,7 @@ describe('GoogleAPIsService', () => {
|
||||
{
|
||||
provide: MessageChannelSyncStatusService,
|
||||
useValue: {
|
||||
resetAndScheduleMessageListFetch: jest.fn(),
|
||||
resetAndMarkAsMessagesListFetchPending: jest.fn(),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -234,11 +234,11 @@ describe('GoogleAPIsService', () => {
|
||||
});
|
||||
|
||||
expect(
|
||||
calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch,
|
||||
calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending,
|
||||
).toHaveBeenCalledWith([existingConnectedAccount.id], 'workspace-id');
|
||||
|
||||
expect(
|
||||
messagingChannelSyncStatusService.resetAndScheduleMessageListFetch,
|
||||
messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending,
|
||||
).toHaveBeenCalledWith([existingConnectedAccount.id], 'workspace-id');
|
||||
|
||||
expect(
|
||||
|
||||
@@ -190,12 +190,12 @@ export class GoogleAPIsService {
|
||||
newOrExistingConnectedAccountId,
|
||||
);
|
||||
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
await this.messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending(
|
||||
[newOrExistingConnectedAccountId],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
[newOrExistingConnectedAccountId],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+4
-4
@@ -110,13 +110,13 @@ describe('MicrosoftAPIsService', () => {
|
||||
{
|
||||
provide: CalendarChannelSyncStatusService,
|
||||
useValue: {
|
||||
resetAndScheduleCalendarEventListFetch: jest.fn(),
|
||||
resetAndMarkAsCalendarEventListFetchPending: jest.fn(),
|
||||
},
|
||||
},
|
||||
{
|
||||
provide: MessageChannelSyncStatusService,
|
||||
useValue: {
|
||||
resetAndScheduleMessageListFetch: jest.fn(),
|
||||
resetAndMarkAsMessagesListFetchPending: jest.fn(),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -230,11 +230,11 @@ describe('MicrosoftAPIsService', () => {
|
||||
});
|
||||
|
||||
expect(
|
||||
calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch,
|
||||
calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending,
|
||||
).toHaveBeenCalledWith([existingConnectedAccount.id], 'workspace-id');
|
||||
|
||||
expect(
|
||||
messagingChannelSyncStatusService.resetAndScheduleMessageListFetch,
|
||||
messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending,
|
||||
).toHaveBeenCalledWith([existingConnectedAccount.id], 'workspace-id');
|
||||
|
||||
expect(
|
||||
|
||||
+3
-3
@@ -173,17 +173,17 @@ export class MicrosoftAPIsService {
|
||||
newOrExistingConnectedAccountId,
|
||||
);
|
||||
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
await this.messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending(
|
||||
[newOrExistingConnectedAccountId],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
[newOrExistingConnectedAccountId],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
[newOrExistingConnectedAccountId],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+1
-1
@@ -54,7 +54,7 @@ export class BlocklistReimportCalendarEventsJob {
|
||||
},
|
||||
});
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
calendarChannels.map((calendarChannel) => calendarChannel.id),
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
|
||||
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
|
||||
|
||||
+1
-1
@@ -53,7 +53,7 @@ export class CalendarEventListFetchCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [calendarChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+1
-1
@@ -51,7 +51,7 @@ export class CalendarEventsImportCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [calendarChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+20
-12
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
|
||||
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
type CalendarChannelWorkspaceEntity,
|
||||
@@ -23,6 +24,7 @@ export type CalendarEventListFetchJobData = {
|
||||
export class CalendarEventListFetchJob {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
private readonly calendarFetchEventsService: CalendarFetchEventsService,
|
||||
) {}
|
||||
|
||||
@@ -47,26 +49,32 @@ export class CalendarEventListFetchJob {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (calendarChannel.syncStage) {
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
|
||||
await this.calendarFetchEventsService.fetchCalendarEvents(
|
||||
calendarChannel,
|
||||
calendarChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
await this.calendarFetchEventsService.fetchCalendarEvents(
|
||||
calendarChannel,
|
||||
calendarChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+14
-6
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
|
||||
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
type CalendarChannelWorkspaceEntity,
|
||||
@@ -23,6 +24,7 @@ export type CalendarEventsImportJobData = {
|
||||
export class CalendarEventsImportJob {
|
||||
constructor(
|
||||
private readonly calendarEventsImportService: CalendarEventsImportService,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
@@ -47,18 +49,24 @@ export class CalendarEventsImportJob {
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
+19
-7
@@ -1,4 +1,4 @@
|
||||
import { Scope } from '@nestjs/common';
|
||||
import { Logger, Scope } from '@nestjs/common';
|
||||
|
||||
import { In } from 'typeorm';
|
||||
|
||||
@@ -22,13 +22,16 @@ export type CalendarOngoingStaleJobData = {
|
||||
scope: Scope.REQUEST,
|
||||
})
|
||||
export class CalendarOngoingStaleJob {
|
||||
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
) {}
|
||||
|
||||
@Process(CalendarOngoingStaleJob.name)
|
||||
async handle(): Promise<void> {
|
||||
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
|
||||
const { workspaceId } = data;
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
'calendarChannel',
|
||||
@@ -50,21 +53,30 @@ export class CalendarOngoingStaleJob {
|
||||
calendarChannel.syncStageStartedAt &&
|
||||
isSyncStale(calendarChannel.syncStageStartedAt)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([
|
||||
calendarChannel.id,
|
||||
]);
|
||||
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
switch (calendarChannel.syncStage) {
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING:
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
this.logger.log(
|
||||
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENT_LIST_FETCH_PENDING`,
|
||||
);
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING:
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
this.logger.log(
|
||||
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENTS_IMPORT_PENDING`,
|
||||
);
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
|
||||
+8
-4
@@ -88,7 +88,7 @@ export class CalendarEventImportErrorHandlerService {
|
||||
`CalendarChannelId: ${calendarChannel.id} - Sync cursor error, resetting and rescheduling`,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
@@ -147,14 +147,18 @@ export class CalendarEventImportErrorHandlerService {
|
||||
|
||||
switch (syncStep) {
|
||||
case CalendarEventImportSyncStep.CALENDAR_EVENT_LIST_FETCH:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
case CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
@@ -214,7 +218,7 @@ export class CalendarEventImportErrorHandlerService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.calendarChannelSyncStatusService.resetAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.resetAndMarkAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+7
-3
@@ -52,6 +52,7 @@ export class CalendarEventsImportService {
|
||||
): Promise<void> {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
let calendarEvents: FetchedCalendarEvent[] = [];
|
||||
@@ -66,8 +67,9 @@ export class CalendarEventsImportService {
|
||||
);
|
||||
|
||||
if (!eventIdsToFetch || eventIdsToFetch.length === 0) {
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndMarkAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
@@ -87,8 +89,9 @@ export class CalendarEventsImportService {
|
||||
}
|
||||
|
||||
if (!calendarEvents || calendarEvents?.length === 0) {
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -149,8 +152,9 @@ export class CalendarEventsImportService {
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndMarkAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
await this.calendarEventImportErrorHandlerService.handleDriverException(
|
||||
|
||||
+5
-2
@@ -42,6 +42,7 @@ export class CalendarFetchEventsService {
|
||||
): Promise<void> {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
try {
|
||||
@@ -96,8 +97,9 @@ export class CalendarFetchEventsService {
|
||||
},
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -126,8 +128,9 @@ export class CalendarFetchEventsService {
|
||||
calendarEventIds,
|
||||
);
|
||||
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportPending(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
throw new CalendarEventImportDriverException(
|
||||
|
||||
+70
-35
@@ -7,7 +7,7 @@ import { CacheStorageService } from 'src/engine/core-modules/cache-storage/servi
|
||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
|
||||
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
CalendarChannelSyncStatus,
|
||||
@@ -20,37 +20,45 @@ import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/acc
|
||||
@Injectable()
|
||||
export class CalendarChannelSyncStatusService {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly accountsToReconnectService: AccountsToReconnectService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
public async scheduleCalendarEventListFetch(calendarChannelIds: string[]) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventListFetchOngoing(
|
||||
public async markAsCalendarEventListFetchPending(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventListFetchOngoing(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -61,7 +69,7 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async resetAndScheduleCalendarEventListFetch(
|
||||
public async resetAndMarkAsCalendarEventListFetchPending(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
@@ -76,7 +84,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -86,16 +95,23 @@ export class CalendarChannelSyncStatusService {
|
||||
throttleFailureCount: 0,
|
||||
});
|
||||
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds);
|
||||
await this.markAsCalendarEventListFetchPending(
|
||||
calendarChannelIds,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
public async resetSyncStageStartedAt(calendarChannelIds: string[]) {
|
||||
public async resetSyncStageStartedAt(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -104,28 +120,38 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async scheduleCalendarEventsImport(calendarChannelIds: string[]) {
|
||||
public async markAsCalendarEventsImportPending(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventsImportOngoing(calendarChannelIds: string[]) {
|
||||
public async markAsCalendarEventsImportOngoing(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -135,15 +161,17 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
public async markAsCompletedAndMarkAsCalendarEventListFetchPending(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -155,7 +183,10 @@ export class CalendarChannelSyncStatusService {
|
||||
syncedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds);
|
||||
await this.markAsCalendarEventListFetchPending(
|
||||
calendarChannelIds,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.metricsService.batchIncrementCounter({
|
||||
key: MetricsKeys.CalendarEventSyncJobActive,
|
||||
@@ -172,7 +203,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -202,7 +234,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -217,7 +250,8 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
|
||||
const connectedAccountRepository =
|
||||
await this.twentyORMManager.getRepository<ConnectedAccountWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'connectedAccount',
|
||||
);
|
||||
|
||||
@@ -257,7 +291,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
|
||||
@@ -8,6 +8,10 @@ export const isThrottled = (
|
||||
return false;
|
||||
}
|
||||
|
||||
if (throttleFailureCount === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (
|
||||
computeThrottlePauseUntil(syncStageStartedAt, throttleFailureCount) >
|
||||
new Date()
|
||||
|
||||
+1
-1
@@ -52,7 +52,7 @@ export class BlocklistReimportMessagesJob {
|
||||
},
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
await this.messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending(
|
||||
messageChannels.map((messageChannel) => messageChannel.id),
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+9
-5
@@ -32,9 +32,10 @@ export class MessageChannelSyncStatusService {
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
public async scheduleMessageListFetch(
|
||||
public async markAsMessagesListFetchPending(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
@@ -48,12 +49,14 @@ export class MessageChannelSyncStatusService {
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async scheduleMessagesImport(
|
||||
public async markAsMessagesImportPending(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
@@ -67,10 +70,11 @@ export class MessageChannelSyncStatusService {
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async resetAndScheduleMessageListFetch(
|
||||
public async resetAndMarkAsMessagesListFetchPending(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
@@ -111,7 +115,7 @@ export class MessageChannelSyncStatusService {
|
||||
},
|
||||
);
|
||||
|
||||
await this.scheduleMessageListFetch(messageChannelIds, workspaceId);
|
||||
await this.markAsMessagesListFetchPending(messageChannelIds, workspaceId);
|
||||
}
|
||||
|
||||
public async resetSyncStageStartedAt(
|
||||
@@ -174,7 +178,7 @@ export class MessageChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCompletedAndScheduleMessageListFetch(
|
||||
public async markAsCompletedAndMarkAsMessagesListFetchPending(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
|
||||
+1
-1
@@ -62,7 +62,7 @@ export class MessagingResetChannelCommand extends CommandRunner {
|
||||
);
|
||||
|
||||
for (const messageChannel of messageChannels) {
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
await this.messagingChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
|
||||
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
|
||||
|
||||
+1
-1
@@ -51,7 +51,7 @@ export class MessagingMessageListFetchCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [messageChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+1
-1
@@ -56,7 +56,7 @@ export class MessagingMessagesImportCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [messageChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+31
-24
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
type MessageChannelWorkspaceEntity,
|
||||
@@ -31,6 +32,7 @@ export class MessagingMessageListFetchJob {
|
||||
private readonly messagingMonitoringService: MessagingMonitoringService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
) {}
|
||||
|
||||
@Process(MessagingMessageListFetchJob.name)
|
||||
@@ -65,6 +67,13 @@ export class MessagingMessageListFetchJob {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (
|
||||
isThrottled(
|
||||
@@ -72,35 +81,33 @@ export class MessagingMessageListFetchJob {
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.completed',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.completed',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
|
||||
+14
-6
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
type MessageChannelWorkspaceEntity,
|
||||
@@ -24,6 +25,7 @@ export class MessagingMessagesImportJob {
|
||||
constructor(
|
||||
private readonly messagingMessagesImportService: MessagingMessagesImportService,
|
||||
private readonly messagingMonitoringService: MessagingMonitoringService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
@@ -64,18 +66,24 @@ export class MessagingMessagesImportJob {
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
messageChannel.syncStageStartedAt,
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
|
||||
isThrottled(
|
||||
messageChannel.syncStageStartedAt,
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
+8
-6
@@ -53,10 +53,6 @@ export class MessagingOngoingStaleJob {
|
||||
messageChannel.syncStageStartedAt &&
|
||||
isSyncStale(messageChannel.syncStageStartedAt)
|
||||
) {
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.resetSyncStageStartedAt(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
@@ -65,14 +61,20 @@ export class MessagingOngoingStaleJob {
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGE_LIST_FETCH_PENDING`,
|
||||
);
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
|
||||
case MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED:
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||
);
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+6
-3
@@ -189,7 +189,10 @@ describe('MessagingMessageListFetchService', () => {
|
||||
markAsMessagesListFetchOngoing: jest
|
||||
.fn()
|
||||
.mockResolvedValue(undefined),
|
||||
scheduleMessagesImport: jest.fn().mockResolvedValue(undefined),
|
||||
markAsMessagesImportPending: jest.fn().mockResolvedValue(undefined),
|
||||
markAsMessagesImportScheduled: jest
|
||||
.fn()
|
||||
.mockResolvedValue(undefined),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -331,7 +334,7 @@ describe('MessagingMessageListFetchService', () => {
|
||||
);
|
||||
|
||||
expect(
|
||||
messageChannelSyncStatusService.scheduleMessagesImport,
|
||||
messageChannelSyncStatusService.markAsMessagesImportScheduled,
|
||||
).toHaveBeenCalledWith([mockMicrosoftMessageChannel.id], workspaceId);
|
||||
});
|
||||
|
||||
@@ -390,7 +393,7 @@ describe('MessagingMessageListFetchService', () => {
|
||||
);
|
||||
|
||||
expect(
|
||||
messageChannelSyncStatusService.scheduleMessagesImport,
|
||||
messageChannelSyncStatusService.markAsMessagesImportScheduled,
|
||||
).toHaveBeenCalledWith([mockGoogleMessageChannel.id], workspaceId);
|
||||
});
|
||||
});
|
||||
|
||||
+4
-4
@@ -66,10 +66,10 @@ describe('MessagingMessagesImportService', () => {
|
||||
provide: MessageChannelSyncStatusService,
|
||||
useValue: {
|
||||
markAsMessagesImportOngoing: jest.fn().mockResolvedValue(undefined),
|
||||
markAsCompletedAndScheduleMessageListFetch: jest
|
||||
markAsCompletedAndMarkAsMessagesListFetchPending: jest
|
||||
.fn()
|
||||
.mockResolvedValue(undefined),
|
||||
scheduleMessagesImport: jest.fn().mockResolvedValue(undefined),
|
||||
markAsMessagesImportPending: jest.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -236,7 +236,7 @@ describe('MessagingMessagesImportService', () => {
|
||||
saveMessagesService.saveMessagesAndEnqueueContactCreation,
|
||||
).toHaveBeenCalled();
|
||||
expect(
|
||||
messageChannelSyncStatusService.scheduleMessagesImport,
|
||||
messageChannelSyncStatusService.markAsMessagesImportPending,
|
||||
).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
@@ -293,7 +293,7 @@ describe('MessagingMessagesImportService', () => {
|
||||
);
|
||||
|
||||
expect(
|
||||
messageChannelSyncStatusService.scheduleMessagesImport,
|
||||
messageChannelSyncStatusService.markAsMessagesImportPending,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
+5
-3
@@ -163,17 +163,19 @@ export class MessageImportExceptionHandlerService {
|
||||
|
||||
switch (syncStep) {
|
||||
case MessageImportSyncStep.MESSAGE_LIST_FETCH:
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
case MessageImportSyncStep.MESSAGES_IMPORT_PENDING:
|
||||
case MessageImportSyncStep.MESSAGES_IMPORT_ONGOING:
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
@@ -248,7 +250,7 @@ export class MessageImportExceptionHandlerService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.resetAndMarkAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
+3
-3
@@ -269,7 +269,7 @@ export class MessagingMessageListFetchService {
|
||||
);
|
||||
|
||||
if (totalMessagesToImportCount === 0) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndMarkAsMessagesListFetchPending(
|
||||
[messageChannelWithFreshTokens.id],
|
||||
workspaceId,
|
||||
);
|
||||
@@ -281,7 +281,7 @@ export class MessagingMessageListFetchService {
|
||||
`messageChannelId: ${freshMessageChannel.id} Scheduling direct messages import`,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportScheduled(
|
||||
[messageChannelWithFreshTokens.id],
|
||||
workspaceId,
|
||||
);
|
||||
@@ -289,7 +289,7 @@ export class MessagingMessageListFetchService {
|
||||
await this.messagingMessagesImportService.processMessageBatchImport(
|
||||
{
|
||||
...messageChannelWithFreshTokens,
|
||||
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
|
||||
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED,
|
||||
},
|
||||
messageChannelWithFreshTokens.connectedAccount,
|
||||
workspaceId,
|
||||
|
||||
+3
-3
@@ -101,7 +101,7 @@ export class MessagingMessagesImportService {
|
||||
);
|
||||
|
||||
if (!messageIdsToFetch?.length) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndMarkAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
@@ -158,12 +158,12 @@ export class MessagingMessagesImportService {
|
||||
if (
|
||||
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndMarkAsMessagesListFetchPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportPending(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user