Compare commits

...

4 Commits

Author SHA1 Message Date
prastoin 3930d89be7 Revert "Remove useless columns (#13312)"
This reverts commit 96daf5555d.
2025-07-23 13:36:32 +02:00
prastoin eb7020e696 Revert "13227 workflow wrong completed workflowrun state when multiple branches (#13344)"
This reverts commit 01805cc71c.
2025-07-23 13:36:25 +02:00
prastoin 213e83a025 Revert "13303 workflow clean workflowruns to keep max 1000 workflowruns per workflow (#13353)"
This reverts commit eeade6e94c.
2025-07-23 13:36:18 +02:00
prastoin dc95431928 Revert "FIx main typecheck (#13374)"
This reverts commit 5ac5e269e3.
2025-07-23 13:36:10 +02:00
34 changed files with 5144 additions and 672 deletions
@@ -1,6 +1,6 @@
import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular';
import { FieldContext } from '@/object-record/record-field/contexts/FieldContext';
import { orderWorkflowRunState } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunState';
import { orderWorkflowRunOutput } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunOutput';
import { FieldJsonValue } from '@/object-record/record-field/types/FieldMetadata';
import { useContext } from 'react';
import { isDefined } from 'twenty-shared/utils';
@@ -15,10 +15,10 @@ export const useFormattedJsonFieldValue = ({
if (
fieldDefinition.metadata.objectMetadataNameSingular ===
CoreObjectNameSingular.WorkflowRun &&
fieldDefinition.metadata.fieldName === 'state' &&
fieldDefinition.metadata.fieldName === 'output' &&
isDefined(fieldValue)
) {
return orderWorkflowRunState(fieldValue) as FieldJsonValue;
return orderWorkflowRunOutput(fieldValue) as FieldJsonValue;
}
return fieldValue;
@@ -1,6 +1,6 @@
import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular';
import { FieldContext } from '@/object-record/record-field/contexts/FieldContext';
import { orderWorkflowRunState } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunState';
import { orderWorkflowRunOutput } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunOutput';
import { useContext } from 'react';
import { isDefined, parseJson } from 'twenty-shared/utils';
import { JsonObject, JsonValue } from 'type-fest';
@@ -19,10 +19,10 @@ export const usePrecomputedJsonDraftValue = ({
if (
fieldDefinition.metadata.objectMetadataNameSingular ===
CoreObjectNameSingular.WorkflowRun &&
fieldDefinition.metadata.fieldName === 'state' &&
fieldDefinition.metadata.fieldName === 'output' &&
isDefined(draftValue)
) {
return orderWorkflowRunState(parsedJsonValue) as JsonObject;
return orderWorkflowRunOutput(parsedJsonValue) as JsonObject;
}
return parsedJsonValue;
@@ -10,6 +10,6 @@ export const isWorkflowRunJsonField = ({
}) => {
return (
objectMetadataNameSingular === CoreObjectNameSingular.WorkflowRun &&
fieldName === 'state'
(fieldName === 'output' || fieldName === 'context')
);
};
@@ -0,0 +1,27 @@
import { WorkflowRunOutput } from '@/workflow/types/Workflow';
import { workflowRunOutputSchema } from '@/workflow/validation-schemas/workflowSchema';
import { isDefined } from 'twenty-shared/utils';
import { JsonValue } from 'type-fest';
export const orderWorkflowRunOutput = (value: JsonValue) => {
const parsedValue = workflowRunOutputSchema.safeParse(value);
if (!parsedValue.success) {
return null;
}
const orderedWorkflowRunOutput: WorkflowRunOutput = {
...(isDefined(parsedValue.data.error)
? {
error: parsedValue.data.error,
}
: {}),
...(isDefined(parsedValue.data.stepsOutput)
? {
stepsOutput: parsedValue.data.stepsOutput,
}
: {}),
flow: parsedValue.data.flow,
};
return orderedWorkflowRunOutput;
};
@@ -1,23 +0,0 @@
import { WorkflowRunState } from '@/workflow/types/Workflow';
import { workflowRunStateSchema } from '@/workflow/validation-schemas/workflowSchema';
import { isDefined } from 'twenty-shared/utils';
import { JsonValue } from 'type-fest';
export const orderWorkflowRunState = (value: JsonValue) => {
const parsedValue = workflowRunStateSchema.safeParse(value);
if (!parsedValue.success) {
return null;
}
const orderedWorkflowRunState: WorkflowRunState = {
...(isDefined(parsedValue.data.workflowRunError)
? {
workflowRunError: parsedValue.data.workflowRunError,
}
: {}),
stepInfos: parsedValue.data.stepInfos,
flow: parsedValue.data.flow,
};
return orderedWorkflowRunState;
};
@@ -80,6 +80,8 @@ export const useRunWorkflowVersion = () => {
const recordInput: Partial<WorkflowRun> = {
name: '#0',
status: 'NOT_STARTED',
output: null,
context: null,
workflowVersionId,
workflowId,
createdAt: new Date().toISOString(),
@@ -1,26 +1,63 @@
import {
workflowAiAgentActionSchema,
workflowAiAgentActionSettingsSchema,
workflowCodeActionSchema,
workflowCodeActionSettingsSchema,
workflowCreateRecordActionSchema,
workflowCreateRecordActionSettingsSchema,
workflowCronTriggerSchema,
workflowDatabaseEventTriggerSchema,
workflowDeleteRecordActionSchema,
workflowDeleteRecordActionSettingsSchema,
workflowExecutorOutputSchema,
workflowFilterActionSchema,
workflowFilterActionSettingsSchema,
workflowFindRecordsActionSchema,
workflowFindRecordsActionSettingsSchema,
workflowFormActionSchema,
workflowFormActionSettingsSchema,
workflowHttpRequestActionSchema,
workflowManualTriggerSchema,
workflowRunOutputSchema,
workflowRunOutputStepsOutputSchema,
workflowRunSchema,
workflowRunStateSchema,
workflowRunStatusSchema,
workflowRunStepStatusSchema,
workflowSendEmailActionSchema,
workflowSendEmailActionSettingsSchema,
workflowTriggerSchema,
workflowUpdateRecordActionSchema,
workflowUpdateRecordActionSettingsSchema,
workflowWebhookTriggerSchema,
} from '@/workflow/validation-schemas/workflowSchema';
import { z } from 'zod';
export type WorkflowCodeActionSettings = z.infer<
typeof workflowCodeActionSettingsSchema
>;
export type WorkflowSendEmailActionSettings = z.infer<
typeof workflowSendEmailActionSettingsSchema
>;
export type WorkflowCreateRecordActionSettings = z.infer<
typeof workflowCreateRecordActionSettingsSchema
>;
export type WorkflowUpdateRecordActionSettings = z.infer<
typeof workflowUpdateRecordActionSettingsSchema
>;
export type WorkflowDeleteRecordActionSettings = z.infer<
typeof workflowDeleteRecordActionSettingsSchema
>;
export type WorkflowFindRecordsActionSettings = z.infer<
typeof workflowFindRecordsActionSettingsSchema
>;
export type WorkflowFilterActionSettings = z.infer<
typeof workflowFilterActionSettingsSchema
>;
export type WorkflowFormActionSettings = z.infer<
typeof workflowFormActionSettingsSchema
>;
export type WorkflowCodeAction = z.infer<typeof workflowCodeActionSchema>;
export type WorkflowSendEmailAction = z.infer<
typeof workflowSendEmailActionSchema
@@ -43,6 +80,10 @@ export type WorkflowHttpRequestAction = z.infer<
typeof workflowHttpRequestActionSchema
>;
export type WorkflowAiAgentActionSettings = z.infer<
typeof workflowAiAgentActionSettingsSchema
>;
export type WorkflowAiAgentAction = z.infer<typeof workflowAiAgentActionSchema>;
export type WorkflowAction =
@@ -102,6 +143,14 @@ export type ManualTriggerWorkflowVersion = WorkflowVersion & {
trigger: WorkflowManualTrigger | null;
};
export type WorkflowRunOutput = z.infer<typeof workflowRunOutputSchema>;
export type WorkflowExecutorOutput = z.infer<
typeof workflowExecutorOutputSchema
>;
export type WorkflowRunOutputStepsOutput = z.infer<
typeof workflowRunOutputStepsOutputSchema
>;
export type WorkflowRunStatus = z.infer<typeof workflowRunStatusSchema>;
export type WorkflowRun = z.infer<typeof workflowRunSchema>;
@@ -297,6 +297,27 @@ export const workflowTriggerSchema = z.discriminatedUnion('type', [
workflowWebhookTriggerSchema,
]);
// Step output schemas
export const workflowExecutorOutputSchema = z.object({
result: z.any().optional(),
error: z.any().optional(),
pendingEvent: z.boolean().optional(),
});
export const workflowRunOutputStepsOutputSchema = z.record(
workflowExecutorOutputSchema,
);
// Final workflow run output schema
export const workflowRunOutputSchema = z.object({
flow: z.object({
trigger: workflowTriggerSchema,
steps: z.array(workflowActionSchema),
}),
stepsOutput: workflowRunOutputStepsOutputSchema.optional(),
error: z.any().optional(),
});
export const workflowRunStepStatusSchema = z.nativeEnum(StepStatus);
export const workflowRunStateStepInfoSchema = z.object({
@@ -318,6 +339,8 @@ export const workflowRunStateSchema = z.object({
workflowRunError: z.any().optional(),
});
export const workflowRunContextSchema = z.record(z.any());
export const workflowRunStatusSchema = z.enum([
'NOT_STARTED',
'RUNNING',
@@ -332,6 +355,8 @@ export const workflowRunSchema = z
id: z.string(),
workflowVersionId: z.string(),
workflowId: z.string(),
output: workflowRunOutputSchema.nullable(),
context: workflowRunContextSchema.nullable(),
state: workflowRunStateSchema.nullable(),
status: workflowRunStatusSchema,
createdAt: z.string(),
@@ -6,7 +6,6 @@ export const getNodeVariantFromStepRunStatus = (
): WorkflowDiagramNodeVariant => {
switch (runStatus) {
case 'SUCCESS':
case 'STOPPED':
return 'success';
case 'FAILED':
return 'failure';
@@ -35,8 +35,6 @@ export const WorkflowRunStepOutputDetail = ({ stepId }: { stepId: string }) => {
const stepInfo = workflowRun.state.stepInfos[stepId];
const { status: _, ...stepInfoWithoutStatus } = stepInfo ?? {};
const stepDefinition = getStepDefinitionOrThrow({
stepId,
trigger: workflowRun.state.flow.trigger,
@@ -86,7 +84,7 @@ export const WorkflowRunStepOutputDetail = ({ stepId }: { stepId: string }) => {
<WorkflowRunStepJsonContainer>
<JsonTree
value={stepInfoWithoutStatus ?? t`No output available`}
value={stepInfo ?? t`No output available`}
shouldExpandNodeInitially={isTwoFirstDepths}
emptyArrayLabel={t`Empty Array`}
emptyObjectLabel={t`Empty Object`}
@@ -25,6 +25,8 @@ export const useSubmitFormStep = () => {
id: true,
name: true,
status: true,
output: true,
context: true,
startedAt: true,
endedAt: true,
},
File diff suppressed because it is too large Load Diff
@@ -95,12 +95,11 @@ export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspaces
? { where: { startedAt: MoreThan(this.afterDate) } }
: {};
const workflowRuns = (await workflowRunRepository.find({
const workflowRuns = await workflowRunRepository.find({
...findOption,
skip: offset * this.chunkSize,
take: this.chunkSize,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
})) as any[]; // We type as any as workflowRun output has been removed since 1.1.0 release
});
for (const workflowRun of workflowRuns) {
const output = workflowRun.output;
@@ -1,39 +0,0 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Command } from 'nest-commander';
import { IsNull, Repository } from 'typeorm';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
RunOnWorkspaceArgs,
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
@Command({
name: 'migrate:1-2:remove-workflow-runs-without-state',
description: 'Remove workflow runs without state.',
})
export class RemoveWorkflowRunsWithoutState extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
constructor(
@InjectRepository(Workspace, 'core')
protected readonly workspaceRepository: Repository<Workspace>,
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {
super(workspaceRepository, twentyORMGlobalManager);
}
override async runOnWorkspace({
workspaceId,
}: RunOnWorkspaceArgs): Promise<void> {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
await workflowRunRepository.delete({ state: IsNull() });
}
}
@@ -1,12 +1,8 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
@Module({
imports: [TypeOrmModule.forFeature([Workspace], 'core')],
providers: [RemoveWorkflowRunsWithoutState],
exports: [RemoveWorkflowRunsWithoutState],
imports: [],
providers: [],
exports: [],
})
export class V1_2_UpgradeVersionCommandModule {}
@@ -31,7 +31,6 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command';
import { compareVersionMajorAndMinor } from 'src/utils/version/compare-version-minor-and-major';
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
const execPromise = promisify(exec);
@@ -150,7 +149,6 @@ export class UpgradeCommand extends UpgradeCommandRunner {
protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand,
// 1.2 Commands
protected readonly removeWorkflowRunsWithoutState: RemoveWorkflowRunsWithoutState,
// 1.3 Commands
) {
@@ -204,7 +202,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
};
const commands_120: VersionCommands = {
beforeSyncMetadata: [this.removeWorkflowRunsWithoutState],
beforeSyncMetadata: [],
afterSyncMetadata: [],
};
@@ -161,6 +161,27 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
})
createdBy: ActorMetadata;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.output,
type: FieldMetadataType.RAW_JSON,
label: msg`Output`,
description: msg`Json object to provide output of the workflow run`,
icon: 'IconText',
})
@WorkspaceIsNullable()
output: WorkflowRunOutput | null;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.context,
type: FieldMetadataType.RAW_JSON,
label: msg`Context`,
description: msg`Context`,
icon: 'IconHierarchy2',
})
@WorkspaceIsNullable()
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any> | null;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.state,
type: FieldMetadataType.RAW_JSON,
@@ -168,7 +189,8 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
description: msg`State of the workflow run`,
icon: 'IconHierarchy2',
})
state: WorkflowRunState;
@WorkspaceIsNullable()
state: WorkflowRunState | null;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position,
@@ -21,6 +21,7 @@ import {
WorkflowVersionStepException,
WorkflowVersionStepExceptionCode,
} from 'src/modules/workflow/common/exceptions/workflow-version-step.exception';
import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
@@ -328,14 +329,18 @@ export class WorkflowVersionStepWorkspaceService {
response,
});
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo: {
status: StepStatus.SUCCESS,
const newStepOutput: StepOutput = {
id: stepId,
output: {
result: enrichedResponse,
},
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: newStepOutput,
stepStatus: StepStatus.SUCCESS,
});
await this.workflowRunnerWorkspaceService.resume({
@@ -2,7 +2,6 @@ export type WorkflowExecutorInput = {
stepIds: string[];
workflowRunId: string;
workspaceId: string;
shouldComputeWorkflowRunStatus?: boolean;
};
export type WorkflowBranchExecutorInput = {
@@ -5,7 +5,6 @@ import {
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
describe('canExecuteStep', () => {
const steps = [
@@ -57,12 +56,7 @@ describe('canExecuteStep', () => {
},
};
const result = canExecuteStep({
stepInfos,
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
});
const result = canExecuteStep({ stepInfos, steps, stepId: 'step-3' });
expect(result).toBe(true);
});
@@ -83,7 +77,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@@ -102,7 +95,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@@ -121,7 +113,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
});
@@ -142,7 +133,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@@ -161,7 +151,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@@ -180,7 +169,6 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@@ -199,38 +187,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
});
it('should return false if workflowRun is not RUNNING', () => {
const stepInfos = {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
};
for (const workflowRunStatus of [
WorkflowRunStatus.FAILED,
WorkflowRunStatus.ENQUEUED,
WorkflowRunStatus.COMPLETED,
WorkflowRunStatus.NOT_STARTED,
]) {
const result = canExecuteStep({
stepInfos,
steps,
stepId: 'step-3',
workflowRunStatus,
});
expect(result).toBe(false);
}
});
});
@@ -1,30 +0,0 @@
import { StepStatus } from 'twenty-shared/workflow';
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
describe('workflowShouldFail', () => {
it('should return true if a failed step exists', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.FAILED } };
expect(workflowShouldFail({ steps, stepInfos })).toBeTruthy();
});
it('should return false if no failed step exists', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
expect(workflowShouldFail({ steps, stepInfos })).toBeFalsy();
});
});
@@ -1,79 +0,0 @@
import { StepStatus } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
describe('workflowShouldKeepRunning', () => {
describe('should return true if', () => {
it('running or pending step exists', () => {
for (const testStatus of [StepStatus.PENDING, StepStatus.RUNNING]) {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: testStatus } };
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
}
});
it('success step with not started executable children exists', () => {
const steps = [
{
id: 'step-1',
nextStepIds: ['step-2'],
} as WorkflowAction,
{
id: 'step-2',
} as WorkflowAction,
];
const stepInfos = {
'step-1': { status: StepStatus.SUCCESS },
'step-2': { status: StepStatus.NOT_STARTED },
};
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
});
});
describe('should return false', () => {
it('workflow run only have success steps', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
});
it('success step with not executable not started children exists', () => {
const steps = [
{
id: 'step-1',
nextStepIds: ['step-3'],
} as WorkflowAction,
{
id: 'step-2',
nextStepIds: ['step-3'],
} as WorkflowAction,
{
id: 'step-3',
} as WorkflowAction,
];
const stepInfos = {
'step-1': { status: StepStatus.SUCCESS },
'step-2': { status: StepStatus.FAILED },
'step-3': { status: StepStatus.NOT_STARTED },
};
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
});
});
});
@@ -2,23 +2,16 @@ import { isDefined } from 'twenty-shared/utils';
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
export const canExecuteStep = ({
stepId,
steps,
stepInfos,
workflowRunStatus,
}: {
steps: WorkflowAction[];
stepInfos: WorkflowRunStepInfos;
stepId: string;
workflowRunStatus: WorkflowRunStatus;
}) => {
if (workflowRunStatus !== WorkflowRunStatus.RUNNING) {
return false;
}
if (
isDefined(stepInfos[stepId]?.status) &&
stepInfos[stepId].status !== StepStatus.NOT_STARTED
@@ -1,17 +0,0 @@
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const workflowShouldFail = ({
stepInfos,
steps,
}: {
stepInfos: WorkflowRunStepInfos;
steps: WorkflowAction[];
}) => {
const failedSteps = steps.filter(
(step) => stepInfos[step.id]?.status === StepStatus.FAILED,
);
return failedSteps.length > 0;
};
@@ -1,38 +0,0 @@
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
export const workflowShouldKeepRunning = ({
stepInfos,
steps,
}: {
stepInfos: WorkflowRunStepInfos;
steps: WorkflowAction[];
}) => {
const runningOrPendingStepExists = steps.some((step) =>
[StepStatus.PENDING, StepStatus.RUNNING].includes(
stepInfos[step.id]?.status,
),
);
const successStepWithNotStartedExecutableChildren = steps.some(
(step) =>
stepInfos[step.id]?.status === StepStatus.SUCCESS &&
(step.nextStepIds ?? []).some(
(nextStepId) =>
stepInfos[nextStepId]?.status === StepStatus.NOT_STARTED &&
canExecuteStep({
stepId: nextStepId,
steps,
stepInfos,
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
),
);
return (
runningOrPendingStepExists || successStepWithNotStartedExecutableChildren
);
};
@@ -14,6 +14,7 @@ import {
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
jest.mock(
@@ -46,8 +47,9 @@ describe('WorkflowExecutorWorkspaceService', () => {
const mockWorkflowRunWorkspaceService = {
endWorkflowRun: jest.fn(),
updateWorkflowRunStepInfo: jest.fn(),
getWorkflowRunOrFail: jest.fn(),
updateWorkflowRunStepStatus: jest.fn(),
saveWorkflowRunState: jest.fn(),
getWorkflowRun: jest.fn(),
};
const mockBillingService = {
@@ -123,14 +125,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
nextStepIds: [],
},
] as WorkflowAction[];
const mockStepInfos = {
trigger: { result: {}, status: StepStatus.SUCCESS },
'step-1': { status: StepStatus.NOT_STARTED },
'step-2': { status: StepStatus.NOT_STARTED },
};
mockWorkflowRunWorkspaceService.getWorkflowRunOrFail.mockReturnValue({
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
state: { flow: { steps: mockSteps }, stepInfos: mockStepInfos },
});
@@ -169,30 +168,32 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(4);
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
...mockStepResult,
status: StepStatus.SUCCESS,
},
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: mockStepResult,
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.SUCCESS,
});
// execute second step
@@ -215,30 +216,34 @@ describe('WorkflowExecutorWorkspaceService', () => {
expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled();
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(2);
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
error: 'Step execution failed',
status: StepStatus.FAILED,
},
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: {
error: 'Step execution failed',
},
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
});
@@ -256,29 +261,32 @@ describe('WorkflowExecutorWorkspaceService', () => {
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(2);
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.PENDING,
},
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: mockPendingEvent,
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.PENDING,
});
// No recursive call to execute should happen
@@ -287,6 +295,128 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
});
it('should continue to next step if continueOnFailure is true', async () => {
const stepsWithContinueOnFailure = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: true },
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-2'],
},
{
id: 'step-2',
type: WorkflowActionType.SEND_EMAIL,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: false },
},
},
},
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({
state: {
flow: { steps: stepsWithContinueOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValueOnce({
error: 'Step execution failed but continue',
});
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: {
error: 'Step execution failed but continue',
},
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
// execute second step
expect(workflowActionFactory.get).toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
it('should retry on failure if retryOnFailure is true', async () => {
const stepsWithRetryOnFailure = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: true },
},
},
},
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
state: {
flow: { steps: stepsWithRetryOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValue({
error: 'Step execution failed, will retry',
});
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
for (let attempt = 1; attempt <= 3; attempt++) {
expect(workflowActionFactory.get).toHaveBeenNthCalledWith(
attempt,
WorkflowActionType.CODE,
);
}
expect(workflowActionFactory.get).not.toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
it('should stop when billing validation fails', async () => {
mockBillingService.isBillingEnabled.mockReturnValueOnce(true);
mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false);
@@ -300,7 +430,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
expect(workflowActionFactory.get).toHaveBeenCalledTimes(0);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes(
@@ -308,15 +438,24 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
stepId: 'step-1',
stepInfo: {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
status: StepStatus.FAILED,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepOutput: {
id: 'step-1',
output: {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
},
},
stepStatus: StepStatus.FAILED,
});
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
});
@@ -2,15 +2,18 @@ import { Injectable } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow';
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
StepOutput,
WorkflowRunStatus,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import {
@@ -19,9 +22,8 @@ import {
} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
const MAX_RETRIES_ON_FAILURE = 3;
@Injectable()
export class WorkflowExecutorWorkspaceService {
@@ -36,7 +38,6 @@ export class WorkflowExecutorWorkspaceService {
stepIds,
workflowRunId,
workspaceId,
shouldComputeWorkflowRunStatus = true,
}: WorkflowExecutorInput) {
await Promise.all(
stepIds.map(async (stepIdToExecute) => {
@@ -47,27 +48,187 @@ export class WorkflowExecutorWorkspaceService {
});
}),
);
if (shouldComputeWorkflowRunStatus) {
await this.computeWorkflowRunStatus({
workflowRunId,
workspaceId,
});
}
}
private async executeFromStep({
stepId,
attemptCount = 1,
workflowRunId,
workspaceId,
}: WorkflowBranchExecutorInput) {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({
stepId,
workflowRunId,
workspaceId,
});
if (!isDefined(workflowRunInfo)) {
return;
}
const { stepToExecute, steps, stepInfos } = workflowRunInfo;
if (!canExecuteStep({ stepId, steps, stepInfos })) {
return;
}
const checkCanBillWorkflowNodeExecution =
await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute: stepToExecute.id,
workflowRunId,
workspaceId,
});
const stepInfos = workflowRun.state.stepInfos;
if (!checkCanBillWorkflowNodeExecution) {
return;
}
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
let actionOutput: WorkflowActionOutput;
await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({
workflowRunId,
stepId: stepToExecute.id,
workspaceId,
stepStatus: StepStatus.RUNNING,
});
try {
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context: getWorkflowRunContext(stepInfos),
});
} catch (error) {
actionOutput = {
error: error.message ?? 'Execution result error, no data or error',
};
}
if (!actionOutput.error) {
this.sendWorkflowNodeRunEvent(workspaceId);
}
const stepOutput: StepOutput = {
id: stepToExecute.id,
output: actionOutput,
};
if (actionOutput.pendingEvent) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: StepStatus.PENDING,
});
return;
}
const actionOutputSuccess = isDefined(actionOutput.result);
const isValidActionOutput =
actionOutputSuccess ||
stepToExecute.settings.errorHandlingOptions.continueOnFailure.value;
if (isValidActionOutput) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: isDefined(actionOutput.result)
? StepStatus.SUCCESS
: StepStatus.FAILED,
});
if (
!isDefined(stepToExecute.nextStepIds) ||
stepToExecute.nextStepIds.length === 0 ||
actionOutput.shouldEndWorkflowRun === true
) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.COMPLETED,
});
return;
}
await this.executeFromSteps({
stepIds: stepToExecute.nextStepIds,
workflowRunId,
workspaceId,
});
return;
}
if (
stepToExecute.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
await this.executeFromStep({
stepId,
attemptCount: attemptCount + 1,
workflowRunId,
workspaceId,
});
return;
}
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: StepStatus.FAILED,
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: stepOutput.output.error,
});
}
private async getWorkflowRunInfoOrEndWorkflowRun({
stepId,
workflowRunId,
workspaceId,
}: {
stepId: string;
workflowRunId: string;
workspaceId: string;
}) {
const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({
workflowRunId,
workspaceId,
});
if (!isDefined(workflowRun)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: `WorkflowRun ${workflowRunId} not found`,
});
return;
}
if (!isDefined(workflowRun?.state)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: `WorkflowRun ${workflowRunId} doesn't have any state`,
});
return;
}
const steps = workflowRun.state.flow.steps;
@@ -84,142 +245,11 @@ export class WorkflowExecutorWorkspaceService {
return;
}
if (
!canExecuteStep({
stepId,
steps,
stepInfos,
workflowRunStatus: workflowRun.status,
})
) {
return;
}
let actionOutput: WorkflowActionOutput;
if (await this.canBillWorkflowNodeExecution(workspaceId)) {
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunId,
workspaceId,
});
try {
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context: getWorkflowRunContext(stepInfos),
});
} catch (error) {
actionOutput = {
error: error.message ?? 'Execution result error, no data or error',
};
}
} else {
actionOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
}
const isPendingEvent = actionOutput.pendingEvent;
const isSuccess = isDefined(actionOutput.result);
const isError = isDefined(actionOutput.error);
const isStopped = actionOutput.shouldEndWorkflowRun;
if (!isError) {
this.sendWorkflowNodeRunEvent(workspaceId);
}
let stepInfo: WorkflowRunStepInfo;
if (isPendingEvent) {
stepInfo = {
status: StepStatus.PENDING,
};
} else if (isStopped) {
stepInfo = {
status: StepStatus.STOPPED,
result: actionOutput?.result,
};
} else if (isSuccess) {
stepInfo = {
status: StepStatus.SUCCESS,
result: actionOutput?.result,
};
} else {
stepInfo = {
status: StepStatus.FAILED,
error: actionOutput?.error,
};
}
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo,
workflowRunId,
workspaceId,
});
if (
isSuccess &&
!isStopped &&
isDefined(stepToExecute.nextStepIds) &&
stepToExecute.nextStepIds.length > 0
) {
await this.executeFromSteps({
stepIds: stepToExecute.nextStepIds,
workflowRunId,
workspaceId,
shouldComputeWorkflowRunStatus: false,
});
}
}
private async computeWorkflowRunStatus({
workflowRunId,
workspaceId,
}: {
workflowRunId: string;
workspaceId: string;
}) {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
const stepInfos = workflowRun.state.stepInfos;
const steps = workflowRun.state.flow.steps;
if (workflowShouldKeepRunning({ stepInfos, steps })) {
return;
}
if (workflowShouldFail({ stepInfos, steps })) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'WorkflowRun failed',
});
return;
}
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.COMPLETED,
});
return {
stepToExecute,
steps,
stepInfos: workflowRun.state.stepInfos,
};
}
private sendWorkflowNodeRunEvent(workspaceId: string) {
@@ -235,13 +265,45 @@ export class WorkflowExecutorWorkspaceService {
);
}
private async canBillWorkflowNodeExecution(workspaceId: string) {
return (
private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute,
workflowRunId,
workspaceId,
}: {
stepIdToExecute: string;
workflowRunId: string;
workspaceId: string;
}) {
const canBillWorkflowNodeExecution =
!this.billingService.isBillingEnabled() ||
(await this.billingService.canBillMeteredProduct(
workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
))
);
));
if (!canBillWorkflowNodeExecution) {
const billingOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: {
id: stepIdToExecute,
output: billingOutput,
},
stepStatus: StepStatus.FAILED,
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
}
return canBillWorkflowNodeExecution;
}
}
@@ -103,9 +103,12 @@ export class RunWorkflowJob {
triggerType: workflowVersion.trigger.type,
});
const triggerPayload = workflowRun.context?.trigger ?? {};
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
workspaceId,
payload: triggerPayload,
});
await this.throttleExecution(workflowVersion.workflowId);
@@ -1,34 +0,0 @@
import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CLEAN_WORKFLOW_RUN_CRON_PATTERN,
CleanWorkflowRunsJob,
} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
@Command({
name: 'cron:workflow:clean-workflow-runs',
description: 'Clean workflow runs',
})
export class CronCleanWorkflowRunsCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron({
jobName: CleanWorkflowRunsJob.name,
data: undefined,
options: {
repeat: {
pattern: CLEAN_WORKFLOW_RUN_CRON_PATTERN,
},
},
});
}
}
@@ -1,81 +0,0 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
export const CLEAN_WORKFLOW_RUN_CRON_PATTERN = '0 0 * * *';
const NUMBER_OF_WORKFLOW_RUNS_TO_KEEP = 1000;
@Processor(MessageQueue.cronQueue)
export class CleanWorkflowRunsJob {
private readonly logger = new Logger(CleanWorkflowRunsJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(CleanWorkflowRunsJob.name)
@SentryCronMonitor(CleanWorkflowRunsJob.name, CLEAN_WORKFLOW_RUN_CRON_PATTERN)
async handle() {
const activeWorkspaces = await this.workspaceRepository.find({
where: {
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) {
const schemaName = getWorkspaceSchemaName(activeWorkspace.id);
const workflowRunsToDelete = await mainDataSource.query(
`
WITH ranked_runs AS (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY "workflowId"
ORDER BY "createdAt" DESC
) AS rn
FROM ${schemaName}."workflowRun"
WHERE status IN ('${WorkflowRunStatus.COMPLETED}', '${WorkflowRunStatus.FAILED}')
)
SELECT id, rn FROM ranked_runs WHERE rn > ${NUMBER_OF_WORKFLOW_RUNS_TO_KEEP};
`,
);
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
activeWorkspace.id,
WorkflowRunWorkspaceEntity,
{ shouldBypassPermissionChecks: true },
);
for (const workflowRunToDelete of workflowRunsToDelete) {
await workflowRunRepository.delete(workflowRunToDelete.id);
}
this.logger.log(
`Deleted ${workflowRunsToDelete.length} workflow runs for workspace ${activeWorkspace.id} (schema ${schemaName})`,
);
}
}
}
@@ -9,8 +9,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command';
import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job';
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
import { CleanWorkflowRunsJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command';
@Module({
imports: [
@@ -23,9 +21,7 @@ import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runn
providers: [
WorkflowRunQueueWorkspaceService,
WorkflowRunEnqueueJob,
CleanWorkflowRunsJob,
CronWorkflowRunEnqueueCommand,
CronCleanWorkflowRunsCommand,
],
exports: [WorkflowRunQueueWorkspaceService],
})
@@ -4,7 +4,6 @@ import { isDefined } from 'twenty-shared/utils';
import { StepStatus } from 'twenty-shared/workflow';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { v4 } from 'uuid';
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
@@ -14,6 +13,7 @@ import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/compos
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
StepOutput,
WorkflowRunState,
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
@@ -40,14 +40,15 @@ export class WorkflowRunWorkspaceService {
workflowVersionId,
createdBy,
workflowRunId,
context,
status,
triggerPayload,
}: {
workflowVersionId: string;
createdBy: ActorMetadata;
workflowRunId?: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any>;
status: WorkflowRunStatus.NOT_STARTED | WorkflowRunStatus.ENQUEUED;
triggerPayload: object;
}) {
const workspaceId =
this.scopedWorkspaceContextFactory.create()?.workspaceId;
@@ -92,6 +93,12 @@ export class WorkflowRunWorkspaceService {
);
}
const workflowRunCount = await workflowRunRepository.count({
where: {
workflowId: workflow.id,
},
});
const position = await this.recordPositionService.buildRecordPosition({
value: 'first',
objectMetadata: {
@@ -101,20 +108,7 @@ export class WorkflowRunWorkspaceService {
workspaceId,
});
const initState = this.getInitState(workflowVersion, triggerPayload);
const lastWorkflowRun = await workflowRunRepository.findOne({
where: {
workflowId: workflow.id,
},
order: { createdAt: 'desc' },
});
const workflowRunCountMatch = lastWorkflowRun?.name.match(/#(\d+)/);
const workflowRunCount = workflowRunCountMatch
? parseInt(workflowRunCountMatch[1], 10)
: 0;
const initState = this.getInitState(workflowVersion);
const workflowRun = workflowRunRepository.create({
id: workflowRunId ?? v4(),
@@ -125,6 +119,11 @@ export class WorkflowRunWorkspaceService {
status,
position,
state: initState,
output: {
...initState,
stepsOutput: {},
},
context,
});
await workflowRunRepository.insert(workflowRun);
@@ -136,9 +135,11 @@ export class WorkflowRunWorkspaceService {
async startWorkflowRun({
workflowRunId,
workspaceId,
payload,
}: {
workflowRunId: string;
workspaceId: string;
payload: object;
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
@@ -158,17 +159,29 @@ export class WorkflowRunWorkspaceService {
const partialUpdate = {
status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(),
output: {
...workflowRunToUpdate.output,
stepsOutput: {
trigger: {
result: payload,
},
},
},
state: {
...workflowRunToUpdate.state,
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
trigger: {
result: {},
...workflowRunToUpdate.state?.stepInfos.trigger,
status: StepStatus.SUCCESS,
result: payload,
},
},
},
context: payload
? {
trigger: payload,
}
: (workflowRunToUpdate.context ?? {}),
};
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
@@ -194,6 +207,10 @@ export class WorkflowRunWorkspaceService {
const partialUpdate = {
status,
endedAt: new Date().toISOString(),
output: {
...workflowRunToUpdate.output,
error,
},
state: {
...workflowRunToUpdate.state,
workflowRunError: error,
@@ -212,16 +229,16 @@ export class WorkflowRunWorkspaceService {
}
@WithLock('workflowRunId')
async updateWorkflowRunStepInfo({
stepId,
stepInfo,
async updateWorkflowRunStepStatus({
workflowRunId,
workspaceId,
stepId,
stepStatus,
}: {
stepId: string;
stepInfo: WorkflowRunStepInfo;
workflowRunId: string;
stepId: string;
workspaceId: string;
stepStatus: StepStatus;
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
@@ -234,10 +251,8 @@ export class WorkflowRunWorkspaceService {
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepId]: {
...(workflowRunToUpdate.state?.stepInfos[stepId] || {}),
result: stepInfo?.result,
error: stepInfo?.error,
status: stepInfo.status,
...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}),
status: stepStatus,
},
},
},
@@ -246,6 +261,59 @@ export class WorkflowRunWorkspaceService {
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
@WithLock('workflowRunId')
async saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus,
}: {
workflowRunId: string;
stepOutput: StepOutput;
workspaceId: string;
stepStatus: StepStatus;
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
const partialUpdate = {
output: {
flow: workflowRunToUpdate.output?.flow ?? {
trigger: undefined,
steps: [],
},
stepsOutput: {
...(workflowRunToUpdate.output?.stepsOutput ?? {}),
[stepOutput.id]: stepOutput.output,
},
},
state: {
...workflowRunToUpdate.state,
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepOutput.id]: {
...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}),
result: stepOutput.output?.result,
error: stepOutput.output?.error,
status: stepStatus,
},
},
},
...(stepStatus === StepStatus.SUCCESS
? {
context: {
...workflowRunToUpdate.context,
[stepOutput.id]: stepOutput.output.result,
},
}
: {}),
};
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
@WithLock('workflowRunId')
async updateWorkflowRunStep({
workflowRunId,
@@ -271,11 +339,18 @@ export class WorkflowRunWorkspaceService {
);
}
const updatedSteps = workflowRunToUpdate.state?.flow?.steps?.map(
const updatedSteps = workflowRunToUpdate.output?.flow?.steps?.map(
(existingStep) => (step.id === existingStep.id ? step : existingStep),
);
const partialUpdate = {
output: {
...(workflowRunToUpdate.output ?? {}),
flow: {
...(workflowRunToUpdate.output?.flow ?? {}),
steps: updatedSteps,
},
},
state: {
...workflowRunToUpdate.state,
flow: {
@@ -331,7 +406,6 @@ export class WorkflowRunWorkspaceService {
private getInitState(
workflowVersion: WorkflowVersionWorkspaceEntity,
triggerPayload: object,
): WorkflowRunState | undefined {
if (
!isDefined(workflowVersion.trigger) ||
@@ -346,7 +420,7 @@ export class WorkflowRunWorkspaceService {
steps: workflowVersion.steps,
},
stepInfos: {
trigger: { status: StepStatus.NOT_STARTED, result: triggerPayload },
trigger: { status: StepStatus.NOT_STARTED },
...Object.fromEntries(
workflowVersion.steps.map((step) => [
step.id,
@@ -75,7 +75,9 @@ export class WorkflowRunnerWorkspaceService {
status: shouldEnqueueWorkflowRun
? WorkflowRunStatus.ENQUEUED
: WorkflowRunStatus.NOT_STARTED,
triggerPayload: payload,
context: {
trigger: payload,
},
});
if (shouldEnqueueWorkflowRun) {
@@ -2,7 +2,6 @@ export enum StepStatus {
NOT_STARTED = 'NOT_STARTED',
RUNNING = 'RUNNING',
SUCCESS = 'SUCCESS',
STOPPED = 'STOPPED',
FAILED = 'FAILED',
PENDING = 'PENDING',
}