Compare commits

...

1 Commits

Author SHA1 Message Date
sonarly-bot f337c5dc33 fix: make DATABASE_EVENT updated trigger matching resilient
https://sonarly.com/issue/42419?type=bug

`opportunity.updated` automations can silently stop producing workflow runs while workflows remain ACTIVE, requiring manual deactivate/activate to recover.

Fix: Implemented a resilient field-matching path for DATABASE_EVENT UPDATED/UPSERTED triggers in `WorkflowDatabaseEventTriggerListener`.

What changed:
1) Replaced strict `settings.fields` vs `updatedFields` matching with a dedicated helper:
- `shouldTriggerJobForFieldFilteredEvent(...)`
- still honors explicit field filters
- first checks normalized intersection (case-insensitive + trimmed)
- if no intersection, falls back to checking actual value changes in `before` vs `after` for configured fields

This preserves existing filtered-trigger behavior while preventing silent full drop of runs when `updatedFields` drifts from persisted trigger field naming.

2) Added tests in `workflow-database-event-trigger.listener.spec.ts`:
- triggers when configured field matches updated field only by case
- triggers when `updatedFields` is stale/mismatched but the configured field value actually changed

These tests directly cover the failure mode identified in the RCA.

Authored by Sonarly by autonomous analysis (run 48440).
2026-06-04 06:00:34 +00:00
2 changed files with 150 additions and 29 deletions
@@ -189,6 +189,50 @@ describe('WorkflowDatabaseEventTriggerListener', () => {
expect(messageQueueService.add).not.toHaveBeenCalled();
});
it('should trigger workflow when configured fields only match updated fields by case', async () => {
mockRepository.find.mockResolvedValue([
{
...mockEventListeners[0],
settings: {
eventName: databaseEventName,
fields: ['Field1'],
},
},
]);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).toHaveBeenCalledTimes(1);
});
it('should trigger workflow when updated fields are stale but configured field value changed', async () => {
mockRepository.find.mockResolvedValue([
{
...mockEventListeners[0],
settings: {
eventName: databaseEventName,
fields: ['field1'],
},
},
]);
await listener.handleObjectRecordUpdateEvent({
...mockPayload,
events: [
{
...mockPayload.events[0],
properties: {
updatedFields: ['differentFieldName'],
before: { field1: 'old-value' },
after: { field1: 'new-value' },
},
},
],
});
expect(messageQueueService.add).toHaveBeenCalledTimes(1);
});
it('should handle create events correctly', async () => {
const createPayload: WorkspaceEventBatch<any> = {
...mockPayload,
@@ -9,7 +9,7 @@ import {
type ObjectRecordUpsertEvent,
} from 'twenty-shared/database-events';
import { type ObjectRecord } from 'twenty-shared/types';
import { isDefined } from 'twenty-shared/utils';
import { isDefined, isNonEmptyArray } from 'twenty-shared/utils';
import { In, Raw } from 'typeorm';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
@@ -314,10 +314,8 @@ export class WorkflowDatabaseEventTriggerListener {
const databaseEventName = payload.name;
if (!workspaceId || !databaseEventName) {
this.logger.error(
`Missing workspaceId or eventName in payload ${JSON.stringify(
payload,
)}`,
this.logger.warn(
`Ignoring database event batch with missing metadata (workspaceId=${workspaceId}, eventName=${databaseEventName}, eventsCount=${payload.events.length})`,
);
return true;
@@ -358,27 +356,49 @@ export class WorkflowDatabaseEventTriggerListener {
},
});
let matchedEventCount = 0;
let filteredEventCount = 0;
let enqueuedJobCount = 0;
for (const eventListener of eventListeners) {
for (const eventPayload of payload.events) {
matchedEventCount += 1;
const shouldTriggerJob = this.shouldTriggerJob({
eventPayload,
eventListener,
action,
});
if (shouldTriggerJob) {
await this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload: eventPayload,
},
{ retryLimit: 3 },
);
if (!shouldTriggerJob) {
filteredEventCount += 1;
continue;
}
enqueuedJobCount += 1;
await this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload: eventPayload,
},
{ retryLimit: 3 },
);
}
}
if (
payload.events.length > 0 &&
eventListeners.length > 0 &&
enqueuedJobCount === 0
) {
this.logger.warn(
`Database event batch produced no workflow jobs (workspaceId=${workspaceId}, eventName=${databaseEventName}, action=${action}, listeners=${eventListeners.length}, events=${payload.events.length}, matched=${matchedEventCount}, filtered=${filteredEventCount})`,
);
}
}, authContext);
}
@@ -395,28 +415,85 @@ export class WorkflowDatabaseEventTriggerListener {
const settings = eventListener.settings as UpdateEventTriggerSettings;
const updateEventPayload = eventPayload as ObjectRecordUpdateEvent;
return (
!settings.fields ||
settings.fields.length === 0 ||
settings.fields.some((field) =>
updateEventPayload?.properties?.updatedFields?.includes(field),
)
);
return this.shouldTriggerJobForFieldFilteredEvent({
fields: settings.fields,
eventPayload: updateEventPayload,
});
}
if (action === DatabaseEventAction.UPSERTED) {
const settings = eventListener.settings as UpsertEventTriggerSettings;
const upsertEventPayload = eventPayload as ObjectRecordUpsertEvent;
return (
!settings.fields ||
settings.fields.length === 0 ||
settings.fields.some((field) =>
upsertEventPayload?.properties?.updatedFields?.includes(field),
)
);
return this.shouldTriggerJobForFieldFilteredEvent({
fields: settings.fields,
eventPayload: upsertEventPayload,
});
}
return true;
}
private shouldTriggerJobForFieldFilteredEvent({
fields,
eventPayload,
}: {
fields?: string[];
eventPayload: ObjectRecordUpdateEvent | ObjectRecordUpsertEvent;
}) {
if (!isNonEmptyArray(fields)) {
return true;
}
const updatedFields = eventPayload.properties.updatedFields ?? [];
if (
this.hasIntersection({
left: fields,
right: updatedFields,
})
) {
return true;
}
return fields.some((field) =>
this.didFieldValueChange({
field,
before: eventPayload.properties.before as Record<string, unknown>,
after: eventPayload.properties.after as Record<string, unknown>,
}),
);
}
private hasIntersection({
left,
right,
}: {
left: string[];
right: string[];
}) {
const normalizedRight = new Set(
right.map((field) => field.trim().toLowerCase()),
);
return left.some((field) =>
normalizedRight.has(field.trim().toLowerCase()),
);
}
private didFieldValueChange({
field,
before,
after,
}: {
field: string;
before?: Record<string, unknown>;
after?: Record<string, unknown>;
}) {
if (!before || !after) {
return false;
}
return !Object.is(before[field], after[field]);
}
}