Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 502abe20c1 |
+1
-1
@@ -1 +1 @@
|
||||
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
|
||||
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
|
||||
|
||||
+1
-1
@@ -53,7 +53,7 @@ export class CalendarEventListFetchCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [calendarChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+1
-1
@@ -51,7 +51,7 @@ export class CalendarEventsImportCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [calendarChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+20
-12
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
|
||||
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
type CalendarChannelWorkspaceEntity,
|
||||
@@ -23,6 +24,7 @@ export type CalendarEventListFetchJobData = {
|
||||
export class CalendarEventListFetchJob {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
private readonly calendarFetchEventsService: CalendarFetchEventsService,
|
||||
) {}
|
||||
|
||||
@@ -47,26 +49,32 @@ export class CalendarEventListFetchJob {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (calendarChannel.syncStage) {
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
|
||||
await this.calendarFetchEventsService.fetchCalendarEvents(
|
||||
calendarChannel,
|
||||
calendarChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
await this.calendarFetchEventsService.fetchCalendarEvents(
|
||||
calendarChannel,
|
||||
calendarChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+14
-6
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
|
||||
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
type CalendarChannelWorkspaceEntity,
|
||||
@@ -23,6 +24,7 @@ export type CalendarEventsImportJobData = {
|
||||
export class CalendarEventsImportJob {
|
||||
constructor(
|
||||
private readonly calendarEventsImportService: CalendarEventsImportService,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
@@ -47,18 +49,24 @@ export class CalendarEventsImportJob {
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
calendarChannel.syncStage !==
|
||||
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
|
||||
isThrottled(
|
||||
calendarChannel.syncStageStartedAt,
|
||||
calendarChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
+17
-5
@@ -1,4 +1,4 @@
|
||||
import { Scope } from '@nestjs/common';
|
||||
import { Logger, Scope } from '@nestjs/common';
|
||||
|
||||
import { In } from 'typeorm';
|
||||
|
||||
@@ -22,13 +22,16 @@ export type CalendarOngoingStaleJobData = {
|
||||
scope: Scope.REQUEST,
|
||||
})
|
||||
export class CalendarOngoingStaleJob {
|
||||
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||
) {}
|
||||
|
||||
@Process(CalendarOngoingStaleJob.name)
|
||||
async handle(): Promise<void> {
|
||||
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
|
||||
const { workspaceId } = data;
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
'calendarChannel',
|
||||
@@ -50,21 +53,30 @@ export class CalendarOngoingStaleJob {
|
||||
calendarChannel.syncStageStartedAt &&
|
||||
isSyncStale(calendarChannel.syncStageStartedAt)
|
||||
) {
|
||||
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([
|
||||
calendarChannel.id,
|
||||
]);
|
||||
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
switch (calendarChannel.syncStage) {
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING:
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
|
||||
this.logger.log(
|
||||
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENT_LIST_FETCH_PENDING`,
|
||||
);
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING:
|
||||
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED:
|
||||
this.logger.log(
|
||||
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENTS_IMPORT_PENDING`,
|
||||
);
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
|
||||
+4
@@ -149,12 +149,16 @@ export class CalendarEventImportErrorHandlerService {
|
||||
case CalendarEventImportSyncStep.CALENDAR_EVENT_LIST_FETCH:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
case CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT:
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
|
||||
+4
@@ -52,6 +52,7 @@ export class CalendarEventsImportService {
|
||||
): Promise<void> {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
let calendarEvents: FetchedCalendarEvent[] = [];
|
||||
@@ -68,6 +69,7 @@ export class CalendarEventsImportService {
|
||||
if (!eventIdsToFetch || eventIdsToFetch.length === 0) {
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
@@ -89,6 +91,7 @@ export class CalendarEventsImportService {
|
||||
if (!calendarEvents || calendarEvents?.length === 0) {
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -151,6 +154,7 @@ export class CalendarEventsImportService {
|
||||
|
||||
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
await this.calendarEventImportErrorHandlerService.handleDriverException(
|
||||
|
||||
+3
@@ -42,6 +42,7 @@ export class CalendarFetchEventsService {
|
||||
): Promise<void> {
|
||||
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
try {
|
||||
@@ -98,6 +99,7 @@ export class CalendarFetchEventsService {
|
||||
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -128,6 +130,7 @@ export class CalendarFetchEventsService {
|
||||
|
||||
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||
[calendarChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
throw new CalendarEventImportDriverException(
|
||||
|
||||
+62
-33
@@ -7,7 +7,7 @@ import { CacheStorageService } from 'src/engine/core-modules/cache-storage/servi
|
||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
|
||||
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import {
|
||||
CalendarChannelSyncStage,
|
||||
CalendarChannelSyncStatus,
|
||||
@@ -20,37 +20,45 @@ import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/acc
|
||||
@Injectable()
|
||||
export class CalendarChannelSyncStatusService {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly accountsToReconnectService: AccountsToReconnectService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
public async scheduleCalendarEventListFetch(calendarChannelIds: string[]) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventListFetchOngoing(
|
||||
public async scheduleCalendarEventListFetch(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventListFetchOngoing(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -76,7 +84,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -86,16 +95,20 @@ export class CalendarChannelSyncStatusService {
|
||||
throttleFailureCount: 0,
|
||||
});
|
||||
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds);
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds, workspaceId);
|
||||
}
|
||||
|
||||
public async resetSyncStageStartedAt(calendarChannelIds: string[]) {
|
||||
public async resetSyncStageStartedAt(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -104,28 +117,38 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async scheduleCalendarEventsImport(calendarChannelIds: string[]) {
|
||||
public async scheduleCalendarEventsImport(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
await calendarChannelRepository.update(calendarChannelIds, {
|
||||
syncStage: CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCalendarEventsImportOngoing(calendarChannelIds: string[]) {
|
||||
public async markAsCalendarEventsImportOngoing(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -137,13 +160,15 @@ export class CalendarChannelSyncStatusService {
|
||||
|
||||
public async markAsCompletedAndScheduleCalendarEventListFetch(
|
||||
calendarChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (!calendarChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -155,7 +180,7 @@ export class CalendarChannelSyncStatusService {
|
||||
syncedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds);
|
||||
await this.scheduleCalendarEventListFetch(calendarChannelIds, workspaceId);
|
||||
|
||||
await this.metricsService.batchIncrementCounter({
|
||||
key: MetricsKeys.CalendarEventSyncJobActive,
|
||||
@@ -172,7 +197,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -202,7 +228,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
@@ -217,7 +244,8 @@ export class CalendarChannelSyncStatusService {
|
||||
});
|
||||
|
||||
const connectedAccountRepository =
|
||||
await this.twentyORMManager.getRepository<ConnectedAccountWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'connectedAccount',
|
||||
);
|
||||
|
||||
@@ -257,7 +285,8 @@ export class CalendarChannelSyncStatusService {
|
||||
}
|
||||
|
||||
const calendarChannelRepository =
|
||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'calendarChannel',
|
||||
);
|
||||
|
||||
|
||||
@@ -8,6 +8,10 @@ export const isThrottled = (
|
||||
return false;
|
||||
}
|
||||
|
||||
if (throttleFailureCount === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (
|
||||
computeThrottlePauseUntil(syncStageStartedAt, throttleFailureCount) >
|
||||
new Date()
|
||||
|
||||
+4
@@ -35,6 +35,7 @@ export class MessageChannelSyncStatusService {
|
||||
public async scheduleMessageListFetch(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
@@ -48,12 +49,14 @@ export class MessageChannelSyncStatusService {
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
public async scheduleMessagesImport(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
preserveSyncStageStartedAt: boolean = false,
|
||||
) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
@@ -67,6 +70,7 @@ export class MessageChannelSyncStatusService {
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
|
||||
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
|
||||
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
|
||||
|
||||
+1
-1
@@ -51,7 +51,7 @@ export class MessagingMessageListFetchCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [messageChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+1
-1
@@ -56,7 +56,7 @@ export class MessagingMessagesImportCronJob {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const [messageChannels] = await this.coreDataSource.query(
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
|
||||
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
|
||||
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}' RETURNING *`,
|
||||
);
|
||||
|
||||
|
||||
+31
-24
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
type MessageChannelWorkspaceEntity,
|
||||
@@ -31,6 +32,7 @@ export class MessagingMessageListFetchJob {
|
||||
private readonly messagingMonitoringService: MessagingMonitoringService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
) {}
|
||||
|
||||
@Process(MessagingMessageListFetchJob.name)
|
||||
@@ -65,6 +67,13 @@ export class MessagingMessageListFetchJob {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (
|
||||
isThrottled(
|
||||
@@ -72,35 +81,33 @@ export class MessagingMessageListFetchJob {
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.completed',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'message_list_fetch.completed',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
|
||||
+14
-6
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
type MessageChannelWorkspaceEntity,
|
||||
@@ -24,6 +25,7 @@ export class MessagingMessagesImportJob {
|
||||
constructor(
|
||||
private readonly messagingMessagesImportService: MessagingMessagesImportService,
|
||||
private readonly messagingMonitoringService: MessagingMonitoringService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
@@ -64,18 +66,24 @@ export class MessagingMessagesImportJob {
|
||||
}
|
||||
|
||||
if (
|
||||
isThrottled(
|
||||
messageChannel.syncStageStartedAt,
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
|
||||
isThrottled(
|
||||
messageChannel.syncStageStartedAt,
|
||||
messageChannel.throttleFailureCount,
|
||||
)
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
+6
-4
@@ -53,10 +53,6 @@ export class MessagingOngoingStaleJob {
|
||||
messageChannel.syncStageStartedAt &&
|
||||
isSyncStale(messageChannel.syncStageStartedAt)
|
||||
) {
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.resetSyncStageStartedAt(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
@@ -65,6 +61,9 @@ export class MessagingOngoingStaleJob {
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGE_LIST_FETCH_PENDING`,
|
||||
);
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
@@ -72,6 +71,9 @@ export class MessagingOngoingStaleJob {
|
||||
break;
|
||||
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
|
||||
case MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED:
|
||||
this.logger.log(
|
||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||
);
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
|
||||
+2
@@ -166,6 +166,7 @@ export class MessageImportExceptionHandlerService {
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
@@ -174,6 +175,7 @@ export class MessageImportExceptionHandlerService {
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
true,
|
||||
);
|
||||
break;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user