Compare commits

...

1 Commits

Author SHA1 Message Date
Charles Bochet 502abe20c1 Improvement on messaging (#16351)
In this PR:
- change messaging / calendar stale duration check to 30minutes (cron is
running every 1h, duration check was 1h, so evaluation was flaky)
- when temporary error (throttling), preserve syncStageStartedAt as this
is necessary to assess exponential throttling
2025-12-05 00:04:55 +01:00
19 changed files with 191 additions and 96 deletions
@@ -1 +1 @@
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
@@ -53,7 +53,7 @@ export class CalendarEventListFetchCronJob {
const now = new Date().toISOString();
const [calendarChannels] = await this.coreDataSource.query(
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING}' RETURNING *`,
);
@@ -51,7 +51,7 @@ export class CalendarEventsImportCronJob {
const now = new Date().toISOString();
const [calendarChannels] = await this.coreDataSource.query(
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
`UPDATE ${schemaName}."calendarChannel" SET "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}' RETURNING *`,
);
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import {
CalendarChannelSyncStage,
type CalendarChannelWorkspaceEntity,
@@ -23,6 +24,7 @@ export type CalendarEventListFetchJobData = {
export class CalendarEventListFetchJob {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly calendarFetchEventsService: CalendarFetchEventsService,
) {}
@@ -47,26 +49,32 @@ export class CalendarEventListFetchJob {
return;
}
if (
calendarChannel.syncStage !==
CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED
) {
return;
}
if (
isThrottled(
calendarChannel.syncStageStartedAt,
calendarChannel.throttleFailureCount,
)
) {
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
true,
);
return;
}
switch (calendarChannel.syncStage) {
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
);
break;
default:
break;
}
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
);
}
}
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import {
CalendarChannelSyncStage,
type CalendarChannelWorkspaceEntity,
@@ -23,6 +24,7 @@ export type CalendarEventsImportJobData = {
export class CalendarEventsImportJob {
constructor(
private readonly calendarEventsImportService: CalendarEventsImportService,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@@ -47,18 +49,24 @@ export class CalendarEventsImportJob {
}
if (
isThrottled(
calendarChannel.syncStageStartedAt,
calendarChannel.throttleFailureCount,
)
calendarChannel.syncStage !==
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
) {
return;
}
if (
calendarChannel.syncStage !==
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED
isThrottled(
calendarChannel.syncStageStartedAt,
calendarChannel.throttleFailureCount,
)
) {
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
workspaceId,
true,
);
return;
}
@@ -1,4 +1,4 @@
import { Scope } from '@nestjs/common';
import { Logger, Scope } from '@nestjs/common';
import { In } from 'typeorm';
@@ -22,13 +22,16 @@ export type CalendarOngoingStaleJobData = {
scope: Scope.REQUEST,
})
export class CalendarOngoingStaleJob {
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
) {}
@Process(CalendarOngoingStaleJob.name)
async handle(): Promise<void> {
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
const { workspaceId } = data;
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
@@ -50,21 +53,30 @@ export class CalendarOngoingStaleJob {
calendarChannel.syncStageStartedAt &&
isSyncStale(calendarChannel.syncStageStartedAt)
) {
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([
calendarChannel.id,
]);
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt(
[calendarChannel.id],
workspaceId,
);
switch (calendarChannel.syncStage) {
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING:
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_SCHEDULED:
this.logger.log(
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENT_LIST_FETCH_PENDING`,
);
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
);
break;
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING:
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_SCHEDULED:
this.logger.log(
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to CALENDAR_EVENTS_IMPORT_PENDING`,
);
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
workspaceId,
);
break;
default:
@@ -149,12 +149,16 @@ export class CalendarEventImportErrorHandlerService {
case CalendarEventImportSyncStep.CALENDAR_EVENT_LIST_FETCH:
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
true,
);
break;
case CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT:
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
workspaceId,
true,
);
break;
@@ -52,6 +52,7 @@ export class CalendarEventsImportService {
): Promise<void> {
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing(
[calendarChannel.id],
workspaceId,
);
let calendarEvents: FetchedCalendarEvent[] = [];
@@ -68,6 +69,7 @@ export class CalendarEventsImportService {
if (!eventIdsToFetch || eventIdsToFetch.length === 0) {
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
);
return;
@@ -89,6 +91,7 @@ export class CalendarEventsImportService {
if (!calendarEvents || calendarEvents?.length === 0) {
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
);
}
@@ -151,6 +154,7 @@ export class CalendarEventsImportService {
await this.calendarChannelSyncStatusService.markAsCompletedAndScheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
);
} catch (error) {
await this.calendarEventImportErrorHandlerService.handleDriverException(
@@ -42,6 +42,7 @@ export class CalendarFetchEventsService {
): Promise<void> {
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
[calendarChannel.id],
workspaceId,
);
try {
@@ -98,6 +99,7 @@ export class CalendarFetchEventsService {
await this.calendarChannelSyncStatusService.scheduleCalendarEventListFetch(
[calendarChannel.id],
workspaceId,
);
}
@@ -128,6 +130,7 @@ export class CalendarFetchEventsService {
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
workspaceId,
);
} else {
throw new CalendarEventImportDriverException(
@@ -7,7 +7,7 @@ import { CacheStorageService } from 'src/engine/core-modules/cache-storage/servi
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
CalendarChannelSyncStage,
CalendarChannelSyncStatus,
@@ -20,37 +20,45 @@ import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/acc
@Injectable()
export class CalendarChannelSyncStatusService {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
private readonly cacheStorage: CacheStorageService,
private readonly accountsToReconnectService: AccountsToReconnectService,
private readonly metricsService: MetricsService,
) {}
public async scheduleCalendarEventListFetch(calendarChannelIds: string[]) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
await calendarChannelRepository.update(calendarChannelIds, {
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
});
}
public async markAsCalendarEventListFetchOngoing(
public async scheduleCalendarEventListFetch(
calendarChannelIds: string[],
workspaceId: string,
preserveSyncStageStartedAt: boolean = false,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
await calendarChannelRepository.update(calendarChannelIds, {
syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING,
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
});
}
public async markAsCalendarEventListFetchOngoing(
calendarChannelIds: string[],
workspaceId: string,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -76,7 +84,8 @@ export class CalendarChannelSyncStatusService {
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -86,16 +95,20 @@ export class CalendarChannelSyncStatusService {
throttleFailureCount: 0,
});
await this.scheduleCalendarEventListFetch(calendarChannelIds);
await this.scheduleCalendarEventListFetch(calendarChannelIds, workspaceId);
}
public async resetSyncStageStartedAt(calendarChannelIds: string[]) {
public async resetSyncStageStartedAt(
calendarChannelIds: string[],
workspaceId: string,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -104,28 +117,38 @@ export class CalendarChannelSyncStatusService {
});
}
public async scheduleCalendarEventsImport(calendarChannelIds: string[]) {
public async scheduleCalendarEventsImport(
calendarChannelIds: string[],
workspaceId: string,
preserveSyncStageStartedAt: boolean = false,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
await calendarChannelRepository.update(calendarChannelIds, {
syncStage: CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
});
}
public async markAsCalendarEventsImportOngoing(calendarChannelIds: string[]) {
public async markAsCalendarEventsImportOngoing(
calendarChannelIds: string[],
workspaceId: string,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -137,13 +160,15 @@ export class CalendarChannelSyncStatusService {
public async markAsCompletedAndScheduleCalendarEventListFetch(
calendarChannelIds: string[],
workspaceId: string,
) {
if (!calendarChannelIds.length) {
return;
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -155,7 +180,7 @@ export class CalendarChannelSyncStatusService {
syncedAt: new Date().toISOString(),
});
await this.scheduleCalendarEventListFetch(calendarChannelIds);
await this.scheduleCalendarEventListFetch(calendarChannelIds, workspaceId);
await this.metricsService.batchIncrementCounter({
key: MetricsKeys.CalendarEventSyncJobActive,
@@ -172,7 +197,8 @@ export class CalendarChannelSyncStatusService {
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -202,7 +228,8 @@ export class CalendarChannelSyncStatusService {
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -217,7 +244,8 @@ export class CalendarChannelSyncStatusService {
});
const connectedAccountRepository =
await this.twentyORMManager.getRepository<ConnectedAccountWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
workspaceId,
'connectedAccount',
);
@@ -257,7 +285,8 @@ export class CalendarChannelSyncStatusService {
}
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<CalendarChannelWorkspaceEntity>(
workspaceId,
'calendarChannel',
);
@@ -8,6 +8,10 @@ export const isThrottled = (
return false;
}
if (throttleFailureCount === 0) {
return false;
}
return (
computeThrottlePauseUntil(syncStageStartedAt, throttleFailureCount) >
new Date()
@@ -35,6 +35,7 @@ export class MessageChannelSyncStatusService {
public async scheduleMessageListFetch(
messageChannelIds: string[],
workspaceId: string,
preserveSyncStageStartedAt: boolean = false,
) {
if (!messageChannelIds.length) {
return;
@@ -48,12 +49,14 @@ export class MessageChannelSyncStatusService {
await messageChannelRepository.update(messageChannelIds, {
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING,
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
});
}
public async scheduleMessagesImport(
messageChannelIds: string[],
workspaceId: string,
preserveSyncStageStartedAt: boolean = false,
) {
if (!messageChannelIds.length) {
return;
@@ -67,6 +70,7 @@ export class MessageChannelSyncStatusService {
await messageChannelRepository.update(messageChannelIds, {
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
...(!preserveSyncStageStartedAt ? { syncStageStartedAt: null } : {}),
});
}
@@ -1 +1 @@
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
export const MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 30; // 30 minutes
@@ -51,7 +51,7 @@ export class MessagingMessageListFetchCronJob {
const now = new Date().toISOString();
const [messageChannels] = await this.coreDataSource.query(
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = '${now}'
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING}' RETURNING *`,
);
@@ -56,7 +56,7 @@ export class MessagingMessagesImportCronJob {
const now = new Date().toISOString();
const [messageChannels] = await this.coreDataSource.query(
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = '${now}'
`UPDATE ${schemaName}."messageChannel" SET "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED}', "syncStageStartedAt" = COALESCE("syncStageStartedAt", '${now}')
WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}' RETURNING *`,
);
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import {
MessageChannelSyncStage,
type MessageChannelWorkspaceEntity,
@@ -31,6 +32,7 @@ export class MessagingMessageListFetchJob {
private readonly messagingMonitoringService: MessagingMonitoringService,
private readonly twentyORMManager: TwentyORMManager,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
) {}
@Process(MessagingMessageListFetchJob.name)
@@ -65,6 +67,13 @@ export class MessagingMessageListFetchJob {
return;
}
if (
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED
) {
return;
}
try {
if (
isThrottled(
@@ -72,35 +81,33 @@ export class MessagingMessageListFetchJob {
messageChannel.throttleFailureCount,
)
) {
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
[messageChannel.id],
workspaceId,
true,
);
return;
}
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingMessageListFetchService.processMessageListFetch(
messageChannel,
workspaceId,
);
await this.messagingMessageListFetchService.processMessageListFetch(
messageChannel,
workspaceId,
);
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
break;
default:
break;
}
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
} catch (error) {
await this.messageImportErrorHandlerService.handleDriverException(
error,
@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import {
MessageChannelSyncStage,
type MessageChannelWorkspaceEntity,
@@ -24,6 +25,7 @@ export class MessagingMessagesImportJob {
constructor(
private readonly messagingMessagesImportService: MessagingMessagesImportService,
private readonly messagingMonitoringService: MessagingMonitoringService,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@@ -64,18 +66,24 @@ export class MessagingMessagesImportJob {
}
if (
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
) {
return;
}
if (
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
await this.messageChannelSyncStatusService.scheduleMessagesImport(
[messageChannel.id],
workspaceId,
true,
);
return;
}
@@ -53,10 +53,6 @@ export class MessagingOngoingStaleJob {
messageChannel.syncStageStartedAt &&
isSyncStale(messageChannel.syncStageStartedAt)
) {
this.logger.log(
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
);
await this.messageChannelSyncStatusService.resetSyncStageStartedAt(
[messageChannel.id],
workspaceId,
@@ -65,6 +61,9 @@ export class MessagingOngoingStaleJob {
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_SCHEDULED:
this.logger.log(
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGE_LIST_FETCH_PENDING`,
);
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
[messageChannel.id],
workspaceId,
@@ -72,6 +71,9 @@ export class MessagingOngoingStaleJob {
break;
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
case MessageChannelSyncStage.MESSAGES_IMPORT_SCHEDULED:
this.logger.log(
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
);
await this.messageChannelSyncStatusService.scheduleMessagesImport(
[messageChannel.id],
workspaceId,
@@ -166,6 +166,7 @@ export class MessageImportExceptionHandlerService {
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
[messageChannel.id],
workspaceId,
true,
);
break;
@@ -174,6 +175,7 @@ export class MessageImportExceptionHandlerService {
await this.messageChannelSyncStatusService.scheduleMessagesImport(
[messageChannel.id],
workspaceId,
true,
);
break;