Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3930d89be7 | |||
| eb7020e696 | |||
| 213e83a025 | |||
| dc95431928 |
+3
-3
@@ -1,6 +1,6 @@
|
||||
import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular';
|
||||
import { FieldContext } from '@/object-record/record-field/contexts/FieldContext';
|
||||
import { orderWorkflowRunState } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunState';
|
||||
import { orderWorkflowRunOutput } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunOutput';
|
||||
import { FieldJsonValue } from '@/object-record/record-field/types/FieldMetadata';
|
||||
import { useContext } from 'react';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
@@ -15,10 +15,10 @@ export const useFormattedJsonFieldValue = ({
|
||||
if (
|
||||
fieldDefinition.metadata.objectMetadataNameSingular ===
|
||||
CoreObjectNameSingular.WorkflowRun &&
|
||||
fieldDefinition.metadata.fieldName === 'state' &&
|
||||
fieldDefinition.metadata.fieldName === 'output' &&
|
||||
isDefined(fieldValue)
|
||||
) {
|
||||
return orderWorkflowRunState(fieldValue) as FieldJsonValue;
|
||||
return orderWorkflowRunOutput(fieldValue) as FieldJsonValue;
|
||||
}
|
||||
|
||||
return fieldValue;
|
||||
|
||||
+3
-3
@@ -1,6 +1,6 @@
|
||||
import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular';
|
||||
import { FieldContext } from '@/object-record/record-field/contexts/FieldContext';
|
||||
import { orderWorkflowRunState } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunState';
|
||||
import { orderWorkflowRunOutput } from '@/object-record/record-field/meta-types/utils/orderWorkflowRunOutput';
|
||||
import { useContext } from 'react';
|
||||
import { isDefined, parseJson } from 'twenty-shared/utils';
|
||||
import { JsonObject, JsonValue } from 'type-fest';
|
||||
@@ -19,10 +19,10 @@ export const usePrecomputedJsonDraftValue = ({
|
||||
if (
|
||||
fieldDefinition.metadata.objectMetadataNameSingular ===
|
||||
CoreObjectNameSingular.WorkflowRun &&
|
||||
fieldDefinition.metadata.fieldName === 'state' &&
|
||||
fieldDefinition.metadata.fieldName === 'output' &&
|
||||
isDefined(draftValue)
|
||||
) {
|
||||
return orderWorkflowRunState(parsedJsonValue) as JsonObject;
|
||||
return orderWorkflowRunOutput(parsedJsonValue) as JsonObject;
|
||||
}
|
||||
|
||||
return parsedJsonValue;
|
||||
|
||||
+1
-1
@@ -10,6 +10,6 @@ export const isWorkflowRunJsonField = ({
|
||||
}) => {
|
||||
return (
|
||||
objectMetadataNameSingular === CoreObjectNameSingular.WorkflowRun &&
|
||||
fieldName === 'state'
|
||||
(fieldName === 'output' || fieldName === 'context')
|
||||
);
|
||||
};
|
||||
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
import { WorkflowRunOutput } from '@/workflow/types/Workflow';
|
||||
import { workflowRunOutputSchema } from '@/workflow/validation-schemas/workflowSchema';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
import { JsonValue } from 'type-fest';
|
||||
|
||||
export const orderWorkflowRunOutput = (value: JsonValue) => {
|
||||
const parsedValue = workflowRunOutputSchema.safeParse(value);
|
||||
if (!parsedValue.success) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const orderedWorkflowRunOutput: WorkflowRunOutput = {
|
||||
...(isDefined(parsedValue.data.error)
|
||||
? {
|
||||
error: parsedValue.data.error,
|
||||
}
|
||||
: {}),
|
||||
...(isDefined(parsedValue.data.stepsOutput)
|
||||
? {
|
||||
stepsOutput: parsedValue.data.stepsOutput,
|
||||
}
|
||||
: {}),
|
||||
flow: parsedValue.data.flow,
|
||||
};
|
||||
|
||||
return orderedWorkflowRunOutput;
|
||||
};
|
||||
-23
@@ -1,23 +0,0 @@
|
||||
import { WorkflowRunState } from '@/workflow/types/Workflow';
|
||||
import { workflowRunStateSchema } from '@/workflow/validation-schemas/workflowSchema';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
import { JsonValue } from 'type-fest';
|
||||
|
||||
export const orderWorkflowRunState = (value: JsonValue) => {
|
||||
const parsedValue = workflowRunStateSchema.safeParse(value);
|
||||
if (!parsedValue.success) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const orderedWorkflowRunState: WorkflowRunState = {
|
||||
...(isDefined(parsedValue.data.workflowRunError)
|
||||
? {
|
||||
workflowRunError: parsedValue.data.workflowRunError,
|
||||
}
|
||||
: {}),
|
||||
stepInfos: parsedValue.data.stepInfos,
|
||||
flow: parsedValue.data.flow,
|
||||
};
|
||||
|
||||
return orderedWorkflowRunState;
|
||||
};
|
||||
@@ -80,6 +80,8 @@ export const useRunWorkflowVersion = () => {
|
||||
const recordInput: Partial<WorkflowRun> = {
|
||||
name: '#0',
|
||||
status: 'NOT_STARTED',
|
||||
output: null,
|
||||
context: null,
|
||||
workflowVersionId,
|
||||
workflowId,
|
||||
createdAt: new Date().toISOString(),
|
||||
|
||||
@@ -1,26 +1,63 @@
|
||||
import {
|
||||
workflowAiAgentActionSchema,
|
||||
workflowAiAgentActionSettingsSchema,
|
||||
workflowCodeActionSchema,
|
||||
workflowCodeActionSettingsSchema,
|
||||
workflowCreateRecordActionSchema,
|
||||
workflowCreateRecordActionSettingsSchema,
|
||||
workflowCronTriggerSchema,
|
||||
workflowDatabaseEventTriggerSchema,
|
||||
workflowDeleteRecordActionSchema,
|
||||
workflowDeleteRecordActionSettingsSchema,
|
||||
workflowExecutorOutputSchema,
|
||||
workflowFilterActionSchema,
|
||||
workflowFilterActionSettingsSchema,
|
||||
workflowFindRecordsActionSchema,
|
||||
workflowFindRecordsActionSettingsSchema,
|
||||
workflowFormActionSchema,
|
||||
workflowFormActionSettingsSchema,
|
||||
workflowHttpRequestActionSchema,
|
||||
workflowManualTriggerSchema,
|
||||
workflowRunOutputSchema,
|
||||
workflowRunOutputStepsOutputSchema,
|
||||
workflowRunSchema,
|
||||
workflowRunStateSchema,
|
||||
workflowRunStatusSchema,
|
||||
workflowRunStepStatusSchema,
|
||||
workflowSendEmailActionSchema,
|
||||
workflowSendEmailActionSettingsSchema,
|
||||
workflowTriggerSchema,
|
||||
workflowUpdateRecordActionSchema,
|
||||
workflowUpdateRecordActionSettingsSchema,
|
||||
workflowWebhookTriggerSchema,
|
||||
} from '@/workflow/validation-schemas/workflowSchema';
|
||||
import { z } from 'zod';
|
||||
|
||||
export type WorkflowCodeActionSettings = z.infer<
|
||||
typeof workflowCodeActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowSendEmailActionSettings = z.infer<
|
||||
typeof workflowSendEmailActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowCreateRecordActionSettings = z.infer<
|
||||
typeof workflowCreateRecordActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowUpdateRecordActionSettings = z.infer<
|
||||
typeof workflowUpdateRecordActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowDeleteRecordActionSettings = z.infer<
|
||||
typeof workflowDeleteRecordActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowFindRecordsActionSettings = z.infer<
|
||||
typeof workflowFindRecordsActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowFilterActionSettings = z.infer<
|
||||
typeof workflowFilterActionSettingsSchema
|
||||
>;
|
||||
export type WorkflowFormActionSettings = z.infer<
|
||||
typeof workflowFormActionSettingsSchema
|
||||
>;
|
||||
|
||||
export type WorkflowCodeAction = z.infer<typeof workflowCodeActionSchema>;
|
||||
export type WorkflowSendEmailAction = z.infer<
|
||||
typeof workflowSendEmailActionSchema
|
||||
@@ -43,6 +80,10 @@ export type WorkflowHttpRequestAction = z.infer<
|
||||
typeof workflowHttpRequestActionSchema
|
||||
>;
|
||||
|
||||
export type WorkflowAiAgentActionSettings = z.infer<
|
||||
typeof workflowAiAgentActionSettingsSchema
|
||||
>;
|
||||
|
||||
export type WorkflowAiAgentAction = z.infer<typeof workflowAiAgentActionSchema>;
|
||||
|
||||
export type WorkflowAction =
|
||||
@@ -102,6 +143,14 @@ export type ManualTriggerWorkflowVersion = WorkflowVersion & {
|
||||
trigger: WorkflowManualTrigger | null;
|
||||
};
|
||||
|
||||
export type WorkflowRunOutput = z.infer<typeof workflowRunOutputSchema>;
|
||||
export type WorkflowExecutorOutput = z.infer<
|
||||
typeof workflowExecutorOutputSchema
|
||||
>;
|
||||
export type WorkflowRunOutputStepsOutput = z.infer<
|
||||
typeof workflowRunOutputStepsOutputSchema
|
||||
>;
|
||||
|
||||
export type WorkflowRunStatus = z.infer<typeof workflowRunStatusSchema>;
|
||||
|
||||
export type WorkflowRun = z.infer<typeof workflowRunSchema>;
|
||||
|
||||
@@ -297,6 +297,27 @@ export const workflowTriggerSchema = z.discriminatedUnion('type', [
|
||||
workflowWebhookTriggerSchema,
|
||||
]);
|
||||
|
||||
// Step output schemas
|
||||
export const workflowExecutorOutputSchema = z.object({
|
||||
result: z.any().optional(),
|
||||
error: z.any().optional(),
|
||||
pendingEvent: z.boolean().optional(),
|
||||
});
|
||||
|
||||
export const workflowRunOutputStepsOutputSchema = z.record(
|
||||
workflowExecutorOutputSchema,
|
||||
);
|
||||
|
||||
// Final workflow run output schema
|
||||
export const workflowRunOutputSchema = z.object({
|
||||
flow: z.object({
|
||||
trigger: workflowTriggerSchema,
|
||||
steps: z.array(workflowActionSchema),
|
||||
}),
|
||||
stepsOutput: workflowRunOutputStepsOutputSchema.optional(),
|
||||
error: z.any().optional(),
|
||||
});
|
||||
|
||||
export const workflowRunStepStatusSchema = z.nativeEnum(StepStatus);
|
||||
|
||||
export const workflowRunStateStepInfoSchema = z.object({
|
||||
@@ -318,6 +339,8 @@ export const workflowRunStateSchema = z.object({
|
||||
workflowRunError: z.any().optional(),
|
||||
});
|
||||
|
||||
export const workflowRunContextSchema = z.record(z.any());
|
||||
|
||||
export const workflowRunStatusSchema = z.enum([
|
||||
'NOT_STARTED',
|
||||
'RUNNING',
|
||||
@@ -332,6 +355,8 @@ export const workflowRunSchema = z
|
||||
id: z.string(),
|
||||
workflowVersionId: z.string(),
|
||||
workflowId: z.string(),
|
||||
output: workflowRunOutputSchema.nullable(),
|
||||
context: workflowRunContextSchema.nullable(),
|
||||
state: workflowRunStateSchema.nullable(),
|
||||
status: workflowRunStatusSchema,
|
||||
createdAt: z.string(),
|
||||
|
||||
-1
@@ -6,7 +6,6 @@ export const getNodeVariantFromStepRunStatus = (
|
||||
): WorkflowDiagramNodeVariant => {
|
||||
switch (runStatus) {
|
||||
case 'SUCCESS':
|
||||
case 'STOPPED':
|
||||
return 'success';
|
||||
case 'FAILED':
|
||||
return 'failure';
|
||||
|
||||
+1
-3
@@ -35,8 +35,6 @@ export const WorkflowRunStepOutputDetail = ({ stepId }: { stepId: string }) => {
|
||||
|
||||
const stepInfo = workflowRun.state.stepInfos[stepId];
|
||||
|
||||
const { status: _, ...stepInfoWithoutStatus } = stepInfo ?? {};
|
||||
|
||||
const stepDefinition = getStepDefinitionOrThrow({
|
||||
stepId,
|
||||
trigger: workflowRun.state.flow.trigger,
|
||||
@@ -86,7 +84,7 @@ export const WorkflowRunStepOutputDetail = ({ stepId }: { stepId: string }) => {
|
||||
|
||||
<WorkflowRunStepJsonContainer>
|
||||
<JsonTree
|
||||
value={stepInfoWithoutStatus ?? t`No output available`}
|
||||
value={stepInfo ?? t`No output available`}
|
||||
shouldExpandNodeInitially={isTwoFirstDepths}
|
||||
emptyArrayLabel={t`Empty Array`}
|
||||
emptyObjectLabel={t`Empty Object`}
|
||||
|
||||
+2
@@ -25,6 +25,8 @@ export const useSubmitFormStep = () => {
|
||||
id: true,
|
||||
name: true,
|
||||
status: true,
|
||||
output: true,
|
||||
context: true,
|
||||
startedAt: true,
|
||||
endedAt: true,
|
||||
},
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
+2
-3
@@ -95,12 +95,11 @@ export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspaces
|
||||
? { where: { startedAt: MoreThan(this.afterDate) } }
|
||||
: {};
|
||||
|
||||
const workflowRuns = (await workflowRunRepository.find({
|
||||
const workflowRuns = await workflowRunRepository.find({
|
||||
...findOption,
|
||||
skip: offset * this.chunkSize,
|
||||
take: this.chunkSize,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
})) as any[]; // We type as any as workflowRun output has been removed since 1.1.0 release
|
||||
});
|
||||
|
||||
for (const workflowRun of workflowRuns) {
|
||||
const output = workflowRun.output;
|
||||
|
||||
-39
@@ -1,39 +0,0 @@
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Command } from 'nest-commander';
|
||||
import { IsNull, Repository } from 'typeorm';
|
||||
|
||||
import {
|
||||
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
|
||||
RunOnWorkspaceArgs,
|
||||
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
@Command({
|
||||
name: 'migrate:1-2:remove-workflow-runs-without-state',
|
||||
description: 'Remove workflow runs without state.',
|
||||
})
|
||||
export class RemoveWorkflowRunsWithoutState extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
|
||||
constructor(
|
||||
@InjectRepository(Workspace, 'core')
|
||||
protected readonly workspaceRepository: Repository<Workspace>,
|
||||
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {
|
||||
super(workspaceRepository, twentyORMGlobalManager);
|
||||
}
|
||||
|
||||
override async runOnWorkspace({
|
||||
workspaceId,
|
||||
}: RunOnWorkspaceArgs): Promise<void> {
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'workflowRun',
|
||||
{ shouldBypassPermissionChecks: true },
|
||||
);
|
||||
|
||||
await workflowRunRepository.delete({ state: IsNull() });
|
||||
}
|
||||
}
|
||||
+3
-7
@@ -1,12 +1,8 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
|
||||
@Module({
|
||||
imports: [TypeOrmModule.forFeature([Workspace], 'core')],
|
||||
providers: [RemoveWorkflowRunsWithoutState],
|
||||
exports: [RemoveWorkflowRunsWithoutState],
|
||||
imports: [],
|
||||
providers: [],
|
||||
exports: [],
|
||||
})
|
||||
export class V1_2_UpgradeVersionCommandModule {}
|
||||
|
||||
+1
-3
@@ -31,7 +31,6 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command';
|
||||
import { compareVersionMajorAndMinor } from 'src/utils/version/compare-version-minor-and-major';
|
||||
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
|
||||
|
||||
const execPromise = promisify(exec);
|
||||
|
||||
@@ -150,7 +149,6 @@ export class UpgradeCommand extends UpgradeCommandRunner {
|
||||
protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand,
|
||||
|
||||
// 1.2 Commands
|
||||
protected readonly removeWorkflowRunsWithoutState: RemoveWorkflowRunsWithoutState,
|
||||
|
||||
// 1.3 Commands
|
||||
) {
|
||||
@@ -204,7 +202,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
|
||||
};
|
||||
|
||||
const commands_120: VersionCommands = {
|
||||
beforeSyncMetadata: [this.removeWorkflowRunsWithoutState],
|
||||
beforeSyncMetadata: [],
|
||||
afterSyncMetadata: [],
|
||||
};
|
||||
|
||||
|
||||
+23
-1
@@ -161,6 +161,27 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
|
||||
})
|
||||
createdBy: ActorMetadata;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.output,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
label: msg`Output`,
|
||||
description: msg`Json object to provide output of the workflow run`,
|
||||
icon: 'IconText',
|
||||
})
|
||||
@WorkspaceIsNullable()
|
||||
output: WorkflowRunOutput | null;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.context,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
label: msg`Context`,
|
||||
description: msg`Context`,
|
||||
icon: 'IconHierarchy2',
|
||||
})
|
||||
@WorkspaceIsNullable()
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
context: Record<string, any> | null;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.state,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
@@ -168,7 +189,8 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
|
||||
description: msg`State of the workflow run`,
|
||||
icon: 'IconHierarchy2',
|
||||
})
|
||||
state: WorkflowRunState;
|
||||
@WorkspaceIsNullable()
|
||||
state: WorkflowRunState | null;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position,
|
||||
|
||||
+9
-4
@@ -21,6 +21,7 @@ import {
|
||||
WorkflowVersionStepException,
|
||||
WorkflowVersionStepExceptionCode,
|
||||
} from 'src/modules/workflow/common/exceptions/workflow-version-step.exception';
|
||||
import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util';
|
||||
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
|
||||
@@ -328,14 +329,18 @@ export class WorkflowVersionStepWorkspaceService {
|
||||
response,
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
|
||||
stepId,
|
||||
stepInfo: {
|
||||
status: StepStatus.SUCCESS,
|
||||
const newStepOutput: StepOutput = {
|
||||
id: stepId,
|
||||
output: {
|
||||
result: enrichedResponse,
|
||||
},
|
||||
};
|
||||
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
workspaceId,
|
||||
workflowRunId,
|
||||
stepOutput: newStepOutput,
|
||||
stepStatus: StepStatus.SUCCESS,
|
||||
});
|
||||
|
||||
await this.workflowRunnerWorkspaceService.resume({
|
||||
|
||||
-1
@@ -2,7 +2,6 @@ export type WorkflowExecutorInput = {
|
||||
stepIds: string[];
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
shouldComputeWorkflowRunStatus?: boolean;
|
||||
};
|
||||
|
||||
export type WorkflowBranchExecutorInput = {
|
||||
|
||||
+1
-44
@@ -5,7 +5,6 @@ import {
|
||||
WorkflowActionType,
|
||||
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
describe('canExecuteStep', () => {
|
||||
const steps = [
|
||||
@@ -57,12 +56,7 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
};
|
||||
|
||||
const result = canExecuteStep({
|
||||
stepInfos,
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
});
|
||||
const result = canExecuteStep({ stepInfos, steps, stepId: 'step-3' });
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
@@ -83,7 +77,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
|
||||
@@ -102,7 +95,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
|
||||
@@ -121,7 +113,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
@@ -142,7 +133,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
|
||||
@@ -161,7 +151,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
|
||||
@@ -180,7 +169,6 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
|
||||
@@ -199,38 +187,7 @@ describe('canExecuteStep', () => {
|
||||
},
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('should return false if workflowRun is not RUNNING', () => {
|
||||
const stepInfos = {
|
||||
'step-1': {
|
||||
status: StepStatus.SUCCESS,
|
||||
},
|
||||
'step-2': {
|
||||
status: StepStatus.SUCCESS,
|
||||
},
|
||||
'step-3': {
|
||||
status: StepStatus.NOT_STARTED,
|
||||
},
|
||||
};
|
||||
|
||||
for (const workflowRunStatus of [
|
||||
WorkflowRunStatus.FAILED,
|
||||
WorkflowRunStatus.ENQUEUED,
|
||||
WorkflowRunStatus.COMPLETED,
|
||||
WorkflowRunStatus.NOT_STARTED,
|
||||
]) {
|
||||
const result = canExecuteStep({
|
||||
stepInfos,
|
||||
steps,
|
||||
stepId: 'step-3',
|
||||
workflowRunStatus,
|
||||
});
|
||||
|
||||
expect(result).toBe(false);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
-30
@@ -1,30 +0,0 @@
|
||||
import { StepStatus } from 'twenty-shared/workflow';
|
||||
|
||||
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
|
||||
describe('workflowShouldFail', () => {
|
||||
it('should return true if a failed step exists', () => {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = { 'step-1': { status: StepStatus.FAILED } };
|
||||
|
||||
expect(workflowShouldFail({ steps, stepInfos })).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should return false if no failed step exists', () => {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
|
||||
|
||||
expect(workflowShouldFail({ steps, stepInfos })).toBeFalsy();
|
||||
});
|
||||
});
|
||||
-79
@@ -1,79 +0,0 @@
|
||||
import { StepStatus } from 'twenty-shared/workflow';
|
||||
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
|
||||
|
||||
describe('workflowShouldKeepRunning', () => {
|
||||
describe('should return true if', () => {
|
||||
it('running or pending step exists', () => {
|
||||
for (const testStatus of [StepStatus.PENDING, StepStatus.RUNNING]) {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = { 'step-1': { status: testStatus } };
|
||||
|
||||
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
|
||||
}
|
||||
});
|
||||
|
||||
it('success step with not started executable children exists', () => {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
nextStepIds: ['step-2'],
|
||||
} as WorkflowAction,
|
||||
{
|
||||
id: 'step-2',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = {
|
||||
'step-1': { status: StepStatus.SUCCESS },
|
||||
'step-2': { status: StepStatus.NOT_STARTED },
|
||||
};
|
||||
|
||||
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
||||
describe('should return false', () => {
|
||||
it('workflow run only have success steps', () => {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
|
||||
|
||||
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
|
||||
});
|
||||
|
||||
it('success step with not executable not started children exists', () => {
|
||||
const steps = [
|
||||
{
|
||||
id: 'step-1',
|
||||
nextStepIds: ['step-3'],
|
||||
} as WorkflowAction,
|
||||
{
|
||||
id: 'step-2',
|
||||
nextStepIds: ['step-3'],
|
||||
} as WorkflowAction,
|
||||
{
|
||||
id: 'step-3',
|
||||
} as WorkflowAction,
|
||||
];
|
||||
|
||||
const stepInfos = {
|
||||
'step-1': { status: StepStatus.SUCCESS },
|
||||
'step-2': { status: StepStatus.FAILED },
|
||||
'step-3': { status: StepStatus.NOT_STARTED },
|
||||
};
|
||||
|
||||
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
|
||||
});
|
||||
});
|
||||
});
|
||||
-7
@@ -2,23 +2,16 @@ import { isDefined } from 'twenty-shared/utils';
|
||||
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
|
||||
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
export const canExecuteStep = ({
|
||||
stepId,
|
||||
steps,
|
||||
stepInfos,
|
||||
workflowRunStatus,
|
||||
}: {
|
||||
steps: WorkflowAction[];
|
||||
stepInfos: WorkflowRunStepInfos;
|
||||
stepId: string;
|
||||
workflowRunStatus: WorkflowRunStatus;
|
||||
}) => {
|
||||
if (workflowRunStatus !== WorkflowRunStatus.RUNNING) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (
|
||||
isDefined(stepInfos[stepId]?.status) &&
|
||||
stepInfos[stepId].status !== StepStatus.NOT_STARTED
|
||||
|
||||
-17
@@ -1,17 +0,0 @@
|
||||
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
|
||||
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
|
||||
export const workflowShouldFail = ({
|
||||
stepInfos,
|
||||
steps,
|
||||
}: {
|
||||
stepInfos: WorkflowRunStepInfos;
|
||||
steps: WorkflowAction[];
|
||||
}) => {
|
||||
const failedSteps = steps.filter(
|
||||
(step) => stepInfos[step.id]?.status === StepStatus.FAILED,
|
||||
);
|
||||
|
||||
return failedSteps.length > 0;
|
||||
};
|
||||
-38
@@ -1,38 +0,0 @@
|
||||
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
|
||||
|
||||
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
export const workflowShouldKeepRunning = ({
|
||||
stepInfos,
|
||||
steps,
|
||||
}: {
|
||||
stepInfos: WorkflowRunStepInfos;
|
||||
steps: WorkflowAction[];
|
||||
}) => {
|
||||
const runningOrPendingStepExists = steps.some((step) =>
|
||||
[StepStatus.PENDING, StepStatus.RUNNING].includes(
|
||||
stepInfos[step.id]?.status,
|
||||
),
|
||||
);
|
||||
|
||||
const successStepWithNotStartedExecutableChildren = steps.some(
|
||||
(step) =>
|
||||
stepInfos[step.id]?.status === StepStatus.SUCCESS &&
|
||||
(step.nextStepIds ?? []).some(
|
||||
(nextStepId) =>
|
||||
stepInfos[nextStepId]?.status === StepStatus.NOT_STARTED &&
|
||||
canExecuteStep({
|
||||
stepId: nextStepId,
|
||||
steps,
|
||||
stepInfos,
|
||||
workflowRunStatus: WorkflowRunStatus.RUNNING,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
return (
|
||||
runningOrPendingStepExists || successStepWithNotStartedExecutableChildren
|
||||
);
|
||||
};
|
||||
+196
-57
@@ -14,6 +14,7 @@ import {
|
||||
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
|
||||
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
|
||||
|
||||
jest.mock(
|
||||
@@ -46,8 +47,9 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
|
||||
const mockWorkflowRunWorkspaceService = {
|
||||
endWorkflowRun: jest.fn(),
|
||||
updateWorkflowRunStepInfo: jest.fn(),
|
||||
getWorkflowRunOrFail: jest.fn(),
|
||||
updateWorkflowRunStepStatus: jest.fn(),
|
||||
saveWorkflowRunState: jest.fn(),
|
||||
getWorkflowRun: jest.fn(),
|
||||
};
|
||||
|
||||
const mockBillingService = {
|
||||
@@ -123,14 +125,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
nextStepIds: [],
|
||||
},
|
||||
] as WorkflowAction[];
|
||||
|
||||
const mockStepInfos = {
|
||||
trigger: { result: {}, status: StepStatus.SUCCESS },
|
||||
'step-1': { status: StepStatus.NOT_STARTED },
|
||||
'step-2': { status: StepStatus.NOT_STARTED },
|
||||
};
|
||||
|
||||
mockWorkflowRunWorkspaceService.getWorkflowRunOrFail.mockReturnValue({
|
||||
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
|
||||
state: { flow: { steps: mockSteps }, stepInfos: mockStepInfos },
|
||||
});
|
||||
|
||||
@@ -169,30 +168,32 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenCalledTimes(4);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(1, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
status: StepStatus.RUNNING,
|
||||
},
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
...mockStepResult,
|
||||
status: StepStatus.SUCCESS,
|
||||
},
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: mockStepResult,
|
||||
},
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.SUCCESS,
|
||||
});
|
||||
|
||||
// execute second step
|
||||
@@ -215,30 +216,34 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled();
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(1, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
status: StepStatus.RUNNING,
|
||||
},
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
error: 'Step execution failed',
|
||||
status: StepStatus.FAILED,
|
||||
},
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {
|
||||
error: 'Step execution failed',
|
||||
},
|
||||
},
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.FAILED,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -256,29 +261,32 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(1, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
status: StepStatus.RUNNING,
|
||||
},
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
status: StepStatus.PENDING,
|
||||
},
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: mockPendingEvent,
|
||||
},
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.PENDING,
|
||||
});
|
||||
|
||||
// No recursive call to execute should happen
|
||||
@@ -287,6 +295,128 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('should continue to next step if continueOnFailure is true', async () => {
|
||||
const stepsWithContinueOnFailure = [
|
||||
{
|
||||
id: 'step-1',
|
||||
type: WorkflowActionType.CODE,
|
||||
settings: {
|
||||
errorHandlingOptions: {
|
||||
continueOnFailure: { value: true },
|
||||
retryOnFailure: { value: false },
|
||||
},
|
||||
},
|
||||
nextStepIds: ['step-2'],
|
||||
},
|
||||
{
|
||||
id: 'step-2',
|
||||
type: WorkflowActionType.SEND_EMAIL,
|
||||
settings: {
|
||||
errorHandlingOptions: {
|
||||
continueOnFailure: { value: false },
|
||||
retryOnFailure: { value: false },
|
||||
},
|
||||
},
|
||||
},
|
||||
] as WorkflowAction[];
|
||||
|
||||
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({
|
||||
state: {
|
||||
flow: { steps: stepsWithContinueOnFailure },
|
||||
stepInfos: mockStepInfos,
|
||||
},
|
||||
});
|
||||
|
||||
mockWorkflowExecutor.execute.mockResolvedValueOnce({
|
||||
error: 'Step execution failed but continue',
|
||||
});
|
||||
|
||||
await service.executeFromSteps({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepIds: ['step-1'],
|
||||
workspaceId: mockWorkspaceId,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {
|
||||
error: 'Step execution failed but continue',
|
||||
},
|
||||
},
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.FAILED,
|
||||
});
|
||||
|
||||
// execute second step
|
||||
expect(workflowActionFactory.get).toHaveBeenCalledWith(
|
||||
WorkflowActionType.SEND_EMAIL,
|
||||
);
|
||||
});
|
||||
|
||||
it('should retry on failure if retryOnFailure is true', async () => {
|
||||
const stepsWithRetryOnFailure = [
|
||||
{
|
||||
id: 'step-1',
|
||||
type: WorkflowActionType.CODE,
|
||||
settings: {
|
||||
errorHandlingOptions: {
|
||||
continueOnFailure: { value: false },
|
||||
retryOnFailure: { value: true },
|
||||
},
|
||||
},
|
||||
},
|
||||
] as WorkflowAction[];
|
||||
|
||||
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
|
||||
state: {
|
||||
flow: { steps: stepsWithRetryOnFailure },
|
||||
stepInfos: mockStepInfos,
|
||||
},
|
||||
});
|
||||
|
||||
mockWorkflowExecutor.execute.mockResolvedValue({
|
||||
error: 'Step execution failed, will retry',
|
||||
});
|
||||
|
||||
await service.executeFromSteps({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepIds: ['step-1'],
|
||||
workspaceId: mockWorkspaceId,
|
||||
});
|
||||
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
expect(workflowActionFactory.get).toHaveBeenNthCalledWith(
|
||||
attempt,
|
||||
WorkflowActionType.CODE,
|
||||
);
|
||||
}
|
||||
|
||||
expect(workflowActionFactory.get).not.toHaveBeenCalledWith(
|
||||
WorkflowActionType.SEND_EMAIL,
|
||||
);
|
||||
});
|
||||
|
||||
it('should stop when billing validation fails', async () => {
|
||||
mockBillingService.isBillingEnabled.mockReturnValueOnce(true);
|
||||
mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false);
|
||||
@@ -300,7 +430,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
expect(workflowActionFactory.get).toHaveBeenCalledTimes(0);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes(
|
||||
@@ -308,15 +438,24 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
stepId: 'step-1',
|
||||
stepInfo: {
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
status: StepStatus.FAILED,
|
||||
},
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
workspaceId: 'workspace-id',
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
},
|
||||
},
|
||||
stepStatus: StepStatus.FAILED,
|
||||
});
|
||||
|
||||
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
workspaceId: 'workspace-id',
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
+218
-156
@@ -2,15 +2,18 @@ import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow';
|
||||
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
|
||||
|
||||
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
|
||||
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
|
||||
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
|
||||
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
|
||||
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
|
||||
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
|
||||
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
import {
|
||||
StepOutput,
|
||||
WorkflowRunStatus,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
|
||||
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
|
||||
import {
|
||||
@@ -19,9 +22,8 @@ import {
|
||||
} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
|
||||
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
|
||||
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
|
||||
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
|
||||
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
|
||||
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
|
||||
|
||||
const MAX_RETRIES_ON_FAILURE = 3;
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowExecutorWorkspaceService {
|
||||
@@ -36,7 +38,6 @@ export class WorkflowExecutorWorkspaceService {
|
||||
stepIds,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
shouldComputeWorkflowRunStatus = true,
|
||||
}: WorkflowExecutorInput) {
|
||||
await Promise.all(
|
||||
stepIds.map(async (stepIdToExecute) => {
|
||||
@@ -47,27 +48,187 @@ export class WorkflowExecutorWorkspaceService {
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
if (shouldComputeWorkflowRunStatus) {
|
||||
await this.computeWorkflowRunStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async executeFromStep({
|
||||
stepId,
|
||||
attemptCount = 1,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
}: WorkflowBranchExecutorInput) {
|
||||
const workflowRun =
|
||||
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
|
||||
const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({
|
||||
stepId,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
if (!isDefined(workflowRunInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { stepToExecute, steps, stepInfos } = workflowRunInfo;
|
||||
|
||||
if (!canExecuteStep({ stepId, steps, stepInfos })) {
|
||||
return;
|
||||
}
|
||||
|
||||
const checkCanBillWorkflowNodeExecution =
|
||||
await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
|
||||
stepIdToExecute: stepToExecute.id,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const stepInfos = workflowRun.state.stepInfos;
|
||||
if (!checkCanBillWorkflowNodeExecution) {
|
||||
return;
|
||||
}
|
||||
|
||||
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
|
||||
|
||||
let actionOutput: WorkflowActionOutput;
|
||||
|
||||
await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({
|
||||
workflowRunId,
|
||||
stepId: stepToExecute.id,
|
||||
workspaceId,
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
try {
|
||||
actionOutput = await workflowAction.execute({
|
||||
currentStepId: stepId,
|
||||
steps,
|
||||
context: getWorkflowRunContext(stepInfos),
|
||||
});
|
||||
} catch (error) {
|
||||
actionOutput = {
|
||||
error: error.message ?? 'Execution result error, no data or error',
|
||||
};
|
||||
}
|
||||
|
||||
if (!actionOutput.error) {
|
||||
this.sendWorkflowNodeRunEvent(workspaceId);
|
||||
}
|
||||
|
||||
const stepOutput: StepOutput = {
|
||||
id: stepToExecute.id,
|
||||
output: actionOutput,
|
||||
};
|
||||
|
||||
if (actionOutput.pendingEvent) {
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
workflowRunId,
|
||||
stepOutput,
|
||||
workspaceId,
|
||||
stepStatus: StepStatus.PENDING,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const actionOutputSuccess = isDefined(actionOutput.result);
|
||||
|
||||
const isValidActionOutput =
|
||||
actionOutputSuccess ||
|
||||
stepToExecute.settings.errorHandlingOptions.continueOnFailure.value;
|
||||
|
||||
if (isValidActionOutput) {
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
workflowRunId,
|
||||
stepOutput,
|
||||
workspaceId,
|
||||
stepStatus: isDefined(actionOutput.result)
|
||||
? StepStatus.SUCCESS
|
||||
: StepStatus.FAILED,
|
||||
});
|
||||
|
||||
if (
|
||||
!isDefined(stepToExecute.nextStepIds) ||
|
||||
stepToExecute.nextStepIds.length === 0 ||
|
||||
actionOutput.shouldEndWorkflowRun === true
|
||||
) {
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.COMPLETED,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.executeFromSteps({
|
||||
stepIds: stepToExecute.nextStepIds,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
stepToExecute.settings.errorHandlingOptions.retryOnFailure.value &&
|
||||
attemptCount < MAX_RETRIES_ON_FAILURE
|
||||
) {
|
||||
await this.executeFromStep({
|
||||
stepId,
|
||||
attemptCount: attemptCount + 1,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
workflowRunId,
|
||||
stepOutput,
|
||||
workspaceId,
|
||||
stepStatus: StepStatus.FAILED,
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: stepOutput.output.error,
|
||||
});
|
||||
}
|
||||
|
||||
private async getWorkflowRunInfoOrEndWorkflowRun({
|
||||
stepId,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
}: {
|
||||
stepId: string;
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
}) {
|
||||
const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
if (!isDefined(workflowRun)) {
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: `WorkflowRun ${workflowRunId} not found`,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isDefined(workflowRun?.state)) {
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: `WorkflowRun ${workflowRunId} doesn't have any state`,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const steps = workflowRun.state.flow.steps;
|
||||
|
||||
@@ -84,142 +245,11 @@ export class WorkflowExecutorWorkspaceService {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
!canExecuteStep({
|
||||
stepId,
|
||||
steps,
|
||||
stepInfos,
|
||||
workflowRunStatus: workflowRun.status,
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
let actionOutput: WorkflowActionOutput;
|
||||
|
||||
if (await this.canBillWorkflowNodeExecution(workspaceId)) {
|
||||
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
|
||||
|
||||
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
|
||||
stepId,
|
||||
stepInfo: {
|
||||
status: StepStatus.RUNNING,
|
||||
},
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
try {
|
||||
actionOutput = await workflowAction.execute({
|
||||
currentStepId: stepId,
|
||||
steps,
|
||||
context: getWorkflowRunContext(stepInfos),
|
||||
});
|
||||
} catch (error) {
|
||||
actionOutput = {
|
||||
error: error.message ?? 'Execution result error, no data or error',
|
||||
};
|
||||
}
|
||||
} else {
|
||||
actionOutput = {
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
};
|
||||
}
|
||||
|
||||
const isPendingEvent = actionOutput.pendingEvent;
|
||||
|
||||
const isSuccess = isDefined(actionOutput.result);
|
||||
|
||||
const isError = isDefined(actionOutput.error);
|
||||
|
||||
const isStopped = actionOutput.shouldEndWorkflowRun;
|
||||
|
||||
if (!isError) {
|
||||
this.sendWorkflowNodeRunEvent(workspaceId);
|
||||
}
|
||||
|
||||
let stepInfo: WorkflowRunStepInfo;
|
||||
|
||||
if (isPendingEvent) {
|
||||
stepInfo = {
|
||||
status: StepStatus.PENDING,
|
||||
};
|
||||
} else if (isStopped) {
|
||||
stepInfo = {
|
||||
status: StepStatus.STOPPED,
|
||||
result: actionOutput?.result,
|
||||
};
|
||||
} else if (isSuccess) {
|
||||
stepInfo = {
|
||||
status: StepStatus.SUCCESS,
|
||||
result: actionOutput?.result,
|
||||
};
|
||||
} else {
|
||||
stepInfo = {
|
||||
status: StepStatus.FAILED,
|
||||
error: actionOutput?.error,
|
||||
};
|
||||
}
|
||||
|
||||
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
|
||||
stepId,
|
||||
stepInfo,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
if (
|
||||
isSuccess &&
|
||||
!isStopped &&
|
||||
isDefined(stepToExecute.nextStepIds) &&
|
||||
stepToExecute.nextStepIds.length > 0
|
||||
) {
|
||||
await this.executeFromSteps({
|
||||
stepIds: stepToExecute.nextStepIds,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
shouldComputeWorkflowRunStatus: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async computeWorkflowRunStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
}) {
|
||||
const workflowRun =
|
||||
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const stepInfos = workflowRun.state.stepInfos;
|
||||
|
||||
const steps = workflowRun.state.flow.steps;
|
||||
|
||||
if (workflowShouldKeepRunning({ stepInfos, steps })) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (workflowShouldFail({ stepInfos, steps })) {
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: 'WorkflowRun failed',
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.COMPLETED,
|
||||
});
|
||||
return {
|
||||
stepToExecute,
|
||||
steps,
|
||||
stepInfos: workflowRun.state.stepInfos,
|
||||
};
|
||||
}
|
||||
|
||||
private sendWorkflowNodeRunEvent(workspaceId: string) {
|
||||
@@ -235,13 +265,45 @@ export class WorkflowExecutorWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
private async canBillWorkflowNodeExecution(workspaceId: string) {
|
||||
return (
|
||||
private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
|
||||
stepIdToExecute,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
}: {
|
||||
stepIdToExecute: string;
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
}) {
|
||||
const canBillWorkflowNodeExecution =
|
||||
!this.billingService.isBillingEnabled() ||
|
||||
(await this.billingService.canBillMeteredProduct(
|
||||
workspaceId,
|
||||
BillingProductKey.WORKFLOW_NODE_EXECUTION,
|
||||
))
|
||||
);
|
||||
));
|
||||
|
||||
if (!canBillWorkflowNodeExecution) {
|
||||
const billingOutput = {
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
};
|
||||
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
workspaceId,
|
||||
workflowRunId,
|
||||
stepOutput: {
|
||||
id: stepIdToExecute,
|
||||
output: billingOutput,
|
||||
},
|
||||
stepStatus: StepStatus.FAILED,
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
|
||||
});
|
||||
}
|
||||
|
||||
return canBillWorkflowNodeExecution;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,9 +103,12 @@ export class RunWorkflowJob {
|
||||
triggerType: workflowVersion.trigger.type,
|
||||
});
|
||||
|
||||
const triggerPayload = workflowRun.context?.trigger ?? {};
|
||||
|
||||
await this.workflowRunWorkspaceService.startWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
payload: triggerPayload,
|
||||
});
|
||||
|
||||
await this.throttleExecution(workflowVersion.workflowId);
|
||||
|
||||
-34
@@ -1,34 +0,0 @@
|
||||
import { Command, CommandRunner } from 'nest-commander';
|
||||
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import {
|
||||
CLEAN_WORKFLOW_RUN_CRON_PATTERN,
|
||||
CleanWorkflowRunsJob,
|
||||
} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
|
||||
|
||||
@Command({
|
||||
name: 'cron:workflow:clean-workflow-runs',
|
||||
description: 'Clean workflow runs',
|
||||
})
|
||||
export class CronCleanWorkflowRunsCommand extends CommandRunner {
|
||||
constructor(
|
||||
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async run(): Promise<void> {
|
||||
await this.messageQueueService.addCron({
|
||||
jobName: CleanWorkflowRunsJob.name,
|
||||
data: undefined,
|
||||
options: {
|
||||
repeat: {
|
||||
pattern: CLEAN_WORKFLOW_RUN_CRON_PATTERN,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
-81
@@ -1,81 +0,0 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import {
|
||||
WorkflowRunStatus,
|
||||
WorkflowRunWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
export const CLEAN_WORKFLOW_RUN_CRON_PATTERN = '0 0 * * *';
|
||||
|
||||
const NUMBER_OF_WORKFLOW_RUNS_TO_KEEP = 1000;
|
||||
|
||||
@Processor(MessageQueue.cronQueue)
|
||||
export class CleanWorkflowRunsJob {
|
||||
private readonly logger = new Logger(CleanWorkflowRunsJob.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(Workspace, 'core')
|
||||
private readonly workspaceRepository: Repository<Workspace>,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {}
|
||||
|
||||
@Process(CleanWorkflowRunsJob.name)
|
||||
@SentryCronMonitor(CleanWorkflowRunsJob.name, CLEAN_WORKFLOW_RUN_CRON_PATTERN)
|
||||
async handle() {
|
||||
const activeWorkspaces = await this.workspaceRepository.find({
|
||||
where: {
|
||||
activationStatus: WorkspaceActivationStatus.ACTIVE,
|
||||
},
|
||||
});
|
||||
|
||||
const mainDataSource =
|
||||
await this.workspaceDataSourceService.connectToMainDataSource();
|
||||
|
||||
for (const activeWorkspace of activeWorkspaces) {
|
||||
const schemaName = getWorkspaceSchemaName(activeWorkspace.id);
|
||||
|
||||
const workflowRunsToDelete = await mainDataSource.query(
|
||||
`
|
||||
WITH ranked_runs AS (
|
||||
SELECT id,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY "workflowId"
|
||||
ORDER BY "createdAt" DESC
|
||||
) AS rn
|
||||
FROM ${schemaName}."workflowRun"
|
||||
WHERE status IN ('${WorkflowRunStatus.COMPLETED}', '${WorkflowRunStatus.FAILED}')
|
||||
)
|
||||
SELECT id, rn FROM ranked_runs WHERE rn > ${NUMBER_OF_WORKFLOW_RUNS_TO_KEEP};
|
||||
`,
|
||||
);
|
||||
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||
activeWorkspace.id,
|
||||
WorkflowRunWorkspaceEntity,
|
||||
{ shouldBypassPermissionChecks: true },
|
||||
);
|
||||
|
||||
for (const workflowRunToDelete of workflowRunsToDelete) {
|
||||
await workflowRunRepository.delete(workflowRunToDelete.id);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Deleted ${workflowRunsToDelete.length} workflow runs for workspace ${activeWorkspace.id} (schema ${schemaName})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
-4
@@ -9,8 +9,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
|
||||
import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command';
|
||||
import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job';
|
||||
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
|
||||
import { CleanWorkflowRunsJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
|
||||
import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -23,9 +21,7 @@ import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runn
|
||||
providers: [
|
||||
WorkflowRunQueueWorkspaceService,
|
||||
WorkflowRunEnqueueJob,
|
||||
CleanWorkflowRunsJob,
|
||||
CronWorkflowRunEnqueueCommand,
|
||||
CronCleanWorkflowRunsCommand,
|
||||
],
|
||||
exports: [WorkflowRunQueueWorkspaceService],
|
||||
})
|
||||
|
||||
+105
-31
@@ -4,7 +4,6 @@ import { isDefined } from 'twenty-shared/utils';
|
||||
import { StepStatus } from 'twenty-shared/workflow';
|
||||
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
|
||||
import { v4 } from 'uuid';
|
||||
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
|
||||
|
||||
import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator';
|
||||
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
|
||||
@@ -14,6 +13,7 @@ import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/compos
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import {
|
||||
StepOutput,
|
||||
WorkflowRunState,
|
||||
WorkflowRunStatus,
|
||||
WorkflowRunWorkspaceEntity,
|
||||
@@ -40,14 +40,15 @@ export class WorkflowRunWorkspaceService {
|
||||
workflowVersionId,
|
||||
createdBy,
|
||||
workflowRunId,
|
||||
context,
|
||||
status,
|
||||
triggerPayload,
|
||||
}: {
|
||||
workflowVersionId: string;
|
||||
createdBy: ActorMetadata;
|
||||
workflowRunId?: string;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
context: Record<string, any>;
|
||||
status: WorkflowRunStatus.NOT_STARTED | WorkflowRunStatus.ENQUEUED;
|
||||
triggerPayload: object;
|
||||
}) {
|
||||
const workspaceId =
|
||||
this.scopedWorkspaceContextFactory.create()?.workspaceId;
|
||||
@@ -92,6 +93,12 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const workflowRunCount = await workflowRunRepository.count({
|
||||
where: {
|
||||
workflowId: workflow.id,
|
||||
},
|
||||
});
|
||||
|
||||
const position = await this.recordPositionService.buildRecordPosition({
|
||||
value: 'first',
|
||||
objectMetadata: {
|
||||
@@ -101,20 +108,7 @@ export class WorkflowRunWorkspaceService {
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const initState = this.getInitState(workflowVersion, triggerPayload);
|
||||
|
||||
const lastWorkflowRun = await workflowRunRepository.findOne({
|
||||
where: {
|
||||
workflowId: workflow.id,
|
||||
},
|
||||
order: { createdAt: 'desc' },
|
||||
});
|
||||
|
||||
const workflowRunCountMatch = lastWorkflowRun?.name.match(/#(\d+)/);
|
||||
|
||||
const workflowRunCount = workflowRunCountMatch
|
||||
? parseInt(workflowRunCountMatch[1], 10)
|
||||
: 0;
|
||||
const initState = this.getInitState(workflowVersion);
|
||||
|
||||
const workflowRun = workflowRunRepository.create({
|
||||
id: workflowRunId ?? v4(),
|
||||
@@ -125,6 +119,11 @@ export class WorkflowRunWorkspaceService {
|
||||
status,
|
||||
position,
|
||||
state: initState,
|
||||
output: {
|
||||
...initState,
|
||||
stepsOutput: {},
|
||||
},
|
||||
context,
|
||||
});
|
||||
|
||||
await workflowRunRepository.insert(workflowRun);
|
||||
@@ -136,9 +135,11 @@ export class WorkflowRunWorkspaceService {
|
||||
async startWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
payload,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
payload: object;
|
||||
}) {
|
||||
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
@@ -158,17 +159,29 @@ export class WorkflowRunWorkspaceService {
|
||||
const partialUpdate = {
|
||||
status: WorkflowRunStatus.RUNNING,
|
||||
startedAt: new Date().toISOString(),
|
||||
output: {
|
||||
...workflowRunToUpdate.output,
|
||||
stepsOutput: {
|
||||
trigger: {
|
||||
result: payload,
|
||||
},
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
trigger: {
|
||||
result: {},
|
||||
...workflowRunToUpdate.state?.stepInfos.trigger,
|
||||
status: StepStatus.SUCCESS,
|
||||
result: payload,
|
||||
},
|
||||
},
|
||||
},
|
||||
context: payload
|
||||
? {
|
||||
trigger: payload,
|
||||
}
|
||||
: (workflowRunToUpdate.context ?? {}),
|
||||
};
|
||||
|
||||
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
|
||||
@@ -194,6 +207,10 @@ export class WorkflowRunWorkspaceService {
|
||||
const partialUpdate = {
|
||||
status,
|
||||
endedAt: new Date().toISOString(),
|
||||
output: {
|
||||
...workflowRunToUpdate.output,
|
||||
error,
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
workflowRunError: error,
|
||||
@@ -212,16 +229,16 @@ export class WorkflowRunWorkspaceService {
|
||||
}
|
||||
|
||||
@WithLock('workflowRunId')
|
||||
async updateWorkflowRunStepInfo({
|
||||
stepId,
|
||||
stepInfo,
|
||||
async updateWorkflowRunStepStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
stepId,
|
||||
stepStatus,
|
||||
}: {
|
||||
stepId: string;
|
||||
stepInfo: WorkflowRunStepInfo;
|
||||
workflowRunId: string;
|
||||
stepId: string;
|
||||
workspaceId: string;
|
||||
stepStatus: StepStatus;
|
||||
}) {
|
||||
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
@@ -234,10 +251,8 @@ export class WorkflowRunWorkspaceService {
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
[stepId]: {
|
||||
...(workflowRunToUpdate.state?.stepInfos[stepId] || {}),
|
||||
result: stepInfo?.result,
|
||||
error: stepInfo?.error,
|
||||
status: stepInfo.status,
|
||||
...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}),
|
||||
status: stepStatus,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -246,6 +261,59 @@ export class WorkflowRunWorkspaceService {
|
||||
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
|
||||
}
|
||||
|
||||
@WithLock('workflowRunId')
|
||||
async saveWorkflowRunState({
|
||||
workflowRunId,
|
||||
stepOutput,
|
||||
workspaceId,
|
||||
stepStatus,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
stepOutput: StepOutput;
|
||||
workspaceId: string;
|
||||
stepStatus: StepStatus;
|
||||
}) {
|
||||
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
flow: workflowRunToUpdate.output?.flow ?? {
|
||||
trigger: undefined,
|
||||
steps: [],
|
||||
},
|
||||
stepsOutput: {
|
||||
...(workflowRunToUpdate.output?.stepsOutput ?? {}),
|
||||
[stepOutput.id]: stepOutput.output,
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
[stepOutput.id]: {
|
||||
...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}),
|
||||
result: stepOutput.output?.result,
|
||||
error: stepOutput.output?.error,
|
||||
status: stepStatus,
|
||||
},
|
||||
},
|
||||
},
|
||||
...(stepStatus === StepStatus.SUCCESS
|
||||
? {
|
||||
context: {
|
||||
...workflowRunToUpdate.context,
|
||||
[stepOutput.id]: stepOutput.output.result,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
|
||||
}
|
||||
|
||||
@WithLock('workflowRunId')
|
||||
async updateWorkflowRunStep({
|
||||
workflowRunId,
|
||||
@@ -271,11 +339,18 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const updatedSteps = workflowRunToUpdate.state?.flow?.steps?.map(
|
||||
const updatedSteps = workflowRunToUpdate.output?.flow?.steps?.map(
|
||||
(existingStep) => (step.id === existingStep.id ? step : existingStep),
|
||||
);
|
||||
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
...(workflowRunToUpdate.output ?? {}),
|
||||
flow: {
|
||||
...(workflowRunToUpdate.output?.flow ?? {}),
|
||||
steps: updatedSteps,
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
flow: {
|
||||
@@ -331,7 +406,6 @@ export class WorkflowRunWorkspaceService {
|
||||
|
||||
private getInitState(
|
||||
workflowVersion: WorkflowVersionWorkspaceEntity,
|
||||
triggerPayload: object,
|
||||
): WorkflowRunState | undefined {
|
||||
if (
|
||||
!isDefined(workflowVersion.trigger) ||
|
||||
@@ -346,7 +420,7 @@ export class WorkflowRunWorkspaceService {
|
||||
steps: workflowVersion.steps,
|
||||
},
|
||||
stepInfos: {
|
||||
trigger: { status: StepStatus.NOT_STARTED, result: triggerPayload },
|
||||
trigger: { status: StepStatus.NOT_STARTED },
|
||||
...Object.fromEntries(
|
||||
workflowVersion.steps.map((step) => [
|
||||
step.id,
|
||||
|
||||
+3
-1
@@ -75,7 +75,9 @@ export class WorkflowRunnerWorkspaceService {
|
||||
status: shouldEnqueueWorkflowRun
|
||||
? WorkflowRunStatus.ENQUEUED
|
||||
: WorkflowRunStatus.NOT_STARTED,
|
||||
triggerPayload: payload,
|
||||
context: {
|
||||
trigger: payload,
|
||||
},
|
||||
});
|
||||
|
||||
if (shouldEnqueueWorkflowRun) {
|
||||
|
||||
@@ -2,7 +2,6 @@ export enum StepStatus {
|
||||
NOT_STARTED = 'NOT_STARTED',
|
||||
RUNNING = 'RUNNING',
|
||||
SUCCESS = 'SUCCESS',
|
||||
STOPPED = 'STOPPED',
|
||||
FAILED = 'FAILED',
|
||||
PENDING = 'PENDING',
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user