Compare commits

...

2 Commits

Author SHA1 Message Date
Thomas Trompette ccdaa31273 WIP¨ 2026-03-20 15:25:35 +01:00
Thomas Trompette f4f7ccd0e1 Improve workflow metrics with faillure reason 2026-03-19 15:30:54 +01:00
11 changed files with 148 additions and 33 deletions
@@ -59,6 +59,12 @@ export const handler = async (event) => {
const builtCode = await fs.readFile(outFilePath, 'utf-8');
return { builtCode };
} catch (error) {
return {
success: false,
category: 'USER_ERROR',
errorMessage: error.message,
};
} finally {
await fs.rm(workDir, { recursive: true, force: true });
}
@@ -118,10 +118,17 @@ export const handler = async (event) => {
await copyYarnEngine(nodejsDir);
await runYarnInstall(nodejsDir);
await createZip(buildDir, zipPath);
await uploadToPresignedUrl(zipPath, presignedUploadUrl);
return { success: true };
} catch (error) {
const isYarnInstallError = error.message?.startsWith('yarn install failed');
return {
success: false,
category: isYarnInstallError ? 'USER_ERROR' : 'INFRA_ERROR',
errorMessage: error.message,
};
} finally {
await fs.rm(buildDir, { recursive: true, force: true });
await fs.rm(zipPath, { force: true });
@@ -81,6 +81,8 @@ type LambdaDriverExecutorPayload = {
handlerName: string;
};
export type LambdaErrorCategory = 'USER_ERROR' | 'INFRA_ERROR';
export type YarnInstallLambdaPayload = {
action: 'createLayer';
packageJson: string;
@@ -88,9 +90,13 @@ export type YarnInstallLambdaPayload = {
presignedUploadUrl: string;
};
export type YarnInstallLambdaResult = {
success: boolean;
};
export type YarnInstallLambdaResult =
| { success: true }
| {
success: false;
category: LambdaErrorCategory;
errorMessage: string;
};
export type BuilderLambdaPayload = {
action: 'transpile';
@@ -99,9 +105,13 @@ export type BuilderLambdaPayload = {
builtFileName: string;
};
export type BuilderLambdaResult = {
builtCode: string;
};
export type BuilderLambdaResult =
| { builtCode: string }
| {
success: false;
category: LambdaErrorCategory;
errorMessage: string;
};
export interface LambdaDriverOptions extends LambdaClientConfig {
logicFunctionResourceService: LogicFunctionResourceService;
@@ -395,10 +405,18 @@ export class LambdaDriver implements LogicFunctionDriver {
const parsedResult: YarnInstallLambdaResult = result.Payload
? JSON.parse(result.Payload.transformToString())
: {};
: { success: false, category: 'INFRA_ERROR', errorMessage: 'Empty payload' };
if (!parsedResult.success) {
throw new Error('Yarn install Lambda did not report success');
const exceptionCode =
parsedResult.category === 'USER_ERROR'
? LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
: LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED;
throw new LogicFunctionException(
`Yarn install Lambda failed: ${parsedResult.errorMessage}`,
exceptionCode,
);
}
return parsedResult;
@@ -496,7 +514,19 @@ export class LambdaDriver implements LogicFunctionDriver {
? JSON.parse(result.Payload.transformToString())
: {};
if (!parsedResult.builtCode) {
if ('success' in parsedResult && parsedResult.success === false) {
const exceptionCode =
parsedResult.category === 'USER_ERROR'
? LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
: LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED;
throw new LogicFunctionException(
`Builder Lambda failed: ${parsedResult.errorMessage}`,
exceptionCode,
);
}
if (!('builtCode' in parsedResult) || !parsedResult.builtCode) {
throw new Error('Builder Lambda did not return builtCode');
}
@@ -18,7 +18,8 @@ export enum MetricsKeys {
WorkflowRunStartedWebhookTrigger = 'workflow-run/started/webhook-trigger',
WorkflowRunStartedManualTrigger = 'workflow-run/started/manual-trigger',
WorkflowRunCompleted = 'workflow-run/completed',
WorkflowRunFailed = 'workflow-run/failed',
WorkflowRunFailedUserError = 'workflow-run/failed/user-error',
WorkflowRunFailedSystemError = 'workflow-run/failed/system-error',
WorkflowRunStopped = 'workflow-run/stopped',
WorkflowRunThrottled = 'workflow-run/throttled',
WorkflowRunFailedToEnqueue = 'workflow-run/failed/to-enqueue',
@@ -12,6 +12,7 @@ export enum LogicFunctionExceptionCode {
LOGIC_FUNCTION_CODE_UNCHANGED = 'LOGIC_FUNCTION_CODE_UNCHANGED',
LOGIC_FUNCTION_EXECUTION_LIMIT_REACHED = 'LOGIC_FUNCTION_EXECUTION_LIMIT_REACHED',
LOGIC_FUNCTION_CREATE_FAILED = 'LOGIC_FUNCTION_CREATE_FAILED',
LOGIC_FUNCTION_BUILD_USER_ERROR = 'LOGIC_FUNCTION_BUILD_USER_ERROR',
LOGIC_FUNCTION_EXECUTION_TIMEOUT = 'LOGIC_FUNCTION_EXECUTION_TIMEOUT',
LOGIC_FUNCTION_DISABLED = 'LOGIC_FUNCTION_DISABLED',
LOGIC_FUNCTION_INVALID_SEED_PROJECT = 'LOGIC_FUNCTION_INVALID_SEED_PROJECT',
@@ -35,6 +36,8 @@ const getLogicFunctionExceptionUserFriendlyMessage = (
return msg`Function execution limit reached.`;
case LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED:
return msg`Failed to create function.`;
case LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR:
return msg`Function build failed due to invalid code or dependencies.`;
case LogicFunctionExceptionCode.LOGIC_FUNCTION_EXECUTION_TIMEOUT:
return msg`Function execution timed out.`;
case LogicFunctionExceptionCode.LOGIC_FUNCTION_DISABLED:
@@ -1,6 +1,9 @@
import { type WorkflowFailureReason } from 'src/modules/workflow/workflow-executor/types/workflow-failure-reason.type';
export type WorkflowActionOutput = {
result?: object;
error?: string;
failureReason?: WorkflowFailureReason;
pendingEvent?: boolean;
shouldEndWorkflowRun?: boolean;
shouldRemainRunning?: boolean;
@@ -0,0 +1 @@
export type WorkflowFailureReason = 'USER_ERROR' | 'SYSTEM_ERROR';
@@ -5,6 +5,10 @@ import { resolveInput } from 'twenty-shared/utils';
import { type WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { LogicFunctionExecutorService } from 'src/engine/core-modules/logic-function/logic-function-executor/logic-function-executor.service';
import {
LogicFunctionException,
LogicFunctionExceptionCode,
} from 'src/engine/metadata-modules/logic-function/logic-function.exception';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
@@ -46,16 +50,31 @@ export class CodeWorkflowAction implements WorkflowAction {
const { workspaceId } = runInfo;
const result = await this.logicFunctionExecutorService.execute({
logicFunctionId: workflowActionInput.logicFunctionId,
workspaceId,
payload: workflowActionInput.logicFunctionInput,
});
try {
const result = await this.logicFunctionExecutorService.execute({
logicFunctionId: workflowActionInput.logicFunctionId,
workspaceId,
payload: workflowActionInput.logicFunctionInput,
});
if (result.error) {
return { error: result.error.errorMessage };
if (result.error) {
return { error: result.error.errorMessage };
}
return { result: result.data || {} };
} catch (error) {
if (
error instanceof LogicFunctionException &&
error.code ===
LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
) {
throw new WorkflowStepExecutorException(
error.message,
WorkflowStepExecutorExceptionCode.INVALID_STEP_INPUT,
);
}
throw error;
}
return { result: result.data || {} };
}
}
@@ -9,6 +9,10 @@ import { HttpTool } from 'src/engine/core-modules/tool/tools/http-tool/http-tool
import { SendEmailTool } from 'src/engine/core-modules/tool/tools/email-tool/send-email-tool';
import { type ToolInput } from 'src/engine/core-modules/tool/types/tool-input.type';
import { type Tool } from 'src/engine/core-modules/tool/types/tool.type';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { type WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { type WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { type WorkflowSendEmailActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/types/workflow-send-email-action-input.type';
@@ -39,13 +43,19 @@ export class ToolExecutorWorkflowAction implements WorkflowAction {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new Error('Step not found');
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
const tool = this.toolsByActionType.get(step.type);
if (!tool) {
throw new Error(`No tool found for workflow action type: ${step.type}`);
throw new WorkflowStepExecutorException(
`No tool found for workflow action type: ${step.type}`,
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
let toolInput = step.settings.input;
@@ -28,6 +28,7 @@ import {
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { type WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { type WorkflowFailureReason } from 'src/modules/workflow/workflow-executor/types/workflow-failure-reason.type';
import {
type WorkflowBranchExecutorInput,
type WorkflowExecutorInput,
@@ -68,9 +69,9 @@ export class WorkflowExecutorWorkspaceService {
shouldComputeWorkflowRunStatus = true,
executedStepsCount = 0,
}: WorkflowExecutorInput) {
await Promise.all(
const reasons = await Promise.all(
stepIds.map(async (stepIdToExecute) => {
await this.executeFromStep({
return this.executeFromStep({
stepId: stepIdToExecute,
workflowRunId,
workspaceId,
@@ -80,9 +81,16 @@ export class WorkflowExecutorWorkspaceService {
);
if (shouldComputeWorkflowRunStatus) {
const failureReason = reasons.some(
(reason) => reason === 'SYSTEM_ERROR',
)
? 'SYSTEM_ERROR'
: reasons.find((reason) => reason === 'USER_ERROR') ?? undefined;
await this.computeWorkflowRunStatus({
workflowRunId,
workspaceId,
failureReason,
});
}
}
@@ -92,7 +100,7 @@ export class WorkflowExecutorWorkspaceService {
workflowRunId,
workspaceId,
executedStepsCount,
}: WorkflowBranchExecutorInput) {
}: WorkflowBranchExecutorInput): Promise<WorkflowFailureReason | undefined> {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
workflowRunId,
@@ -113,7 +121,7 @@ export class WorkflowExecutorWorkspaceService {
error: 'Step not found',
});
return;
return 'SYSTEM_ERROR';
}
let actionOutput: WorkflowActionOutput;
@@ -165,7 +173,7 @@ export class WorkflowExecutorWorkspaceService {
shouldSkipStepExecution: true,
};
} else {
return;
return undefined;
}
const isError =
@@ -183,7 +191,11 @@ export class WorkflowExecutorWorkspaceService {
});
if (!shouldProcessNextSteps) {
return;
if (isError) {
return actionOutput.failureReason ?? 'SYSTEM_ERROR';
}
return undefined;
}
const shouldRunAnotherJob =
@@ -196,7 +208,7 @@ export class WorkflowExecutorWorkspaceService {
workspaceId,
});
return;
return undefined;
}
const { nextStepIdsToExecute, nextStepIdsToSkip, nextStepIdsToFailSafely } =
@@ -225,6 +237,8 @@ export class WorkflowExecutorWorkspaceService {
executedStepsCount: (executedStepsCount ?? 0) + 1,
});
}
return undefined;
}
async getNextStepIdsToExecute({
@@ -300,9 +314,11 @@ export class WorkflowExecutorWorkspaceService {
private async computeWorkflowRunStatus({
workflowRunId,
workspaceId,
failureReason,
}: {
workflowRunId: string;
workspaceId: string;
failureReason?: WorkflowFailureReason;
}) {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
@@ -332,6 +348,7 @@ export class WorkflowExecutorWorkspaceService {
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'WorkflowRun failed',
failureReason,
});
return;
@@ -502,8 +519,13 @@ export class WorkflowExecutorWorkspaceService {
});
}
const failureReason: WorkflowFailureReason = isUserError
? 'USER_ERROR'
: 'SYSTEM_ERROR';
return {
error: error.message ?? 'Execution result error, no data or error',
failureReason,
};
}
}
@@ -187,11 +187,13 @@ export class WorkflowRunWorkspaceService {
workspaceId,
status,
error,
failureReason,
}: {
workflowRunId: string;
workspaceId: string;
status: Extract<WorkflowRunStatus, 'COMPLETED' | 'FAILED' | 'STOPPED'>;
error?: string;
failureReason?: 'USER_ERROR' | 'SYSTEM_ERROR';
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
@@ -222,8 +224,9 @@ export class WorkflowRunWorkspaceService {
? MetricsKeys.WorkflowRunCompleted
: status === WorkflowRunStatus.STOPPED
? MetricsKeys.WorkflowRunStopped
: MetricsKeys.WorkflowRunFailed,
: this.getMetricKeyFromFailureReason(failureReason),
eventId: workflowRunId,
attributes: { workspace_id: workspaceId },
});
}
@@ -250,7 +253,7 @@ export class WorkflowRunWorkspaceService {
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepId]: {
...(workflowRunToUpdate.state?.stepInfos[stepId] || {}),
...workflowRunToUpdate.state?.stepInfos[stepId],
result: stepInfo?.result,
error: stepInfo?.error,
status: stepInfo.status,
@@ -283,7 +286,7 @@ export class WorkflowRunWorkspaceService {
for (const [stepId, info] of Object.entries(stepInfos)) {
mergedStepInfos[stepId] = {
...(existingStepInfos[stepId] || {}),
...existingStepInfos[stepId],
...info,
};
}
@@ -335,7 +338,7 @@ export class WorkflowRunWorkspaceService {
state: {
...workflowRunToUpdate.state,
flow: {
...(workflowRunToUpdate.state?.flow ?? {}),
...workflowRunToUpdate.state?.flow,
steps: updatedSteps,
},
},
@@ -492,4 +495,14 @@ export class WorkflowRunWorkspaceService {
};
}, {});
}
private getMetricKeyFromFailureReason(
failureReason: 'USER_ERROR' | 'SYSTEM_ERROR' | undefined,
): MetricsKeys {
if (failureReason === 'USER_ERROR') {
return MetricsKeys.WorkflowRunFailedUserError;
}
return MetricsKeys.WorkflowRunFailedSystemError;
}
}