Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ccdaa31273 | |||
| f4f7ccd0e1 |
+6
@@ -59,6 +59,12 @@ export const handler = async (event) => {
|
||||
const builtCode = await fs.readFile(outFilePath, 'utf-8');
|
||||
|
||||
return { builtCode };
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
category: 'USER_ERROR',
|
||||
errorMessage: error.message,
|
||||
};
|
||||
} finally {
|
||||
await fs.rm(workDir, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
+8
-1
@@ -118,10 +118,17 @@ export const handler = async (event) => {
|
||||
await copyYarnEngine(nodejsDir);
|
||||
await runYarnInstall(nodejsDir);
|
||||
await createZip(buildDir, zipPath);
|
||||
|
||||
await uploadToPresignedUrl(zipPath, presignedUploadUrl);
|
||||
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
const isYarnInstallError = error.message?.startsWith('yarn install failed');
|
||||
|
||||
return {
|
||||
success: false,
|
||||
category: isYarnInstallError ? 'USER_ERROR' : 'INFRA_ERROR',
|
||||
errorMessage: error.message,
|
||||
};
|
||||
} finally {
|
||||
await fs.rm(buildDir, { recursive: true, force: true });
|
||||
await fs.rm(zipPath, { force: true });
|
||||
|
||||
+39
-9
@@ -81,6 +81,8 @@ type LambdaDriverExecutorPayload = {
|
||||
handlerName: string;
|
||||
};
|
||||
|
||||
export type LambdaErrorCategory = 'USER_ERROR' | 'INFRA_ERROR';
|
||||
|
||||
export type YarnInstallLambdaPayload = {
|
||||
action: 'createLayer';
|
||||
packageJson: string;
|
||||
@@ -88,9 +90,13 @@ export type YarnInstallLambdaPayload = {
|
||||
presignedUploadUrl: string;
|
||||
};
|
||||
|
||||
export type YarnInstallLambdaResult = {
|
||||
success: boolean;
|
||||
};
|
||||
export type YarnInstallLambdaResult =
|
||||
| { success: true }
|
||||
| {
|
||||
success: false;
|
||||
category: LambdaErrorCategory;
|
||||
errorMessage: string;
|
||||
};
|
||||
|
||||
export type BuilderLambdaPayload = {
|
||||
action: 'transpile';
|
||||
@@ -99,9 +105,13 @@ export type BuilderLambdaPayload = {
|
||||
builtFileName: string;
|
||||
};
|
||||
|
||||
export type BuilderLambdaResult = {
|
||||
builtCode: string;
|
||||
};
|
||||
export type BuilderLambdaResult =
|
||||
| { builtCode: string }
|
||||
| {
|
||||
success: false;
|
||||
category: LambdaErrorCategory;
|
||||
errorMessage: string;
|
||||
};
|
||||
|
||||
export interface LambdaDriverOptions extends LambdaClientConfig {
|
||||
logicFunctionResourceService: LogicFunctionResourceService;
|
||||
@@ -395,10 +405,18 @@ export class LambdaDriver implements LogicFunctionDriver {
|
||||
|
||||
const parsedResult: YarnInstallLambdaResult = result.Payload
|
||||
? JSON.parse(result.Payload.transformToString())
|
||||
: {};
|
||||
: { success: false, category: 'INFRA_ERROR', errorMessage: 'Empty payload' };
|
||||
|
||||
if (!parsedResult.success) {
|
||||
throw new Error('Yarn install Lambda did not report success');
|
||||
const exceptionCode =
|
||||
parsedResult.category === 'USER_ERROR'
|
||||
? LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
|
||||
: LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED;
|
||||
|
||||
throw new LogicFunctionException(
|
||||
`Yarn install Lambda failed: ${parsedResult.errorMessage}`,
|
||||
exceptionCode,
|
||||
);
|
||||
}
|
||||
|
||||
return parsedResult;
|
||||
@@ -496,7 +514,19 @@ export class LambdaDriver implements LogicFunctionDriver {
|
||||
? JSON.parse(result.Payload.transformToString())
|
||||
: {};
|
||||
|
||||
if (!parsedResult.builtCode) {
|
||||
if ('success' in parsedResult && parsedResult.success === false) {
|
||||
const exceptionCode =
|
||||
parsedResult.category === 'USER_ERROR'
|
||||
? LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
|
||||
: LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED;
|
||||
|
||||
throw new LogicFunctionException(
|
||||
`Builder Lambda failed: ${parsedResult.errorMessage}`,
|
||||
exceptionCode,
|
||||
);
|
||||
}
|
||||
|
||||
if (!('builtCode' in parsedResult) || !parsedResult.builtCode) {
|
||||
throw new Error('Builder Lambda did not return builtCode');
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,8 @@ export enum MetricsKeys {
|
||||
WorkflowRunStartedWebhookTrigger = 'workflow-run/started/webhook-trigger',
|
||||
WorkflowRunStartedManualTrigger = 'workflow-run/started/manual-trigger',
|
||||
WorkflowRunCompleted = 'workflow-run/completed',
|
||||
WorkflowRunFailed = 'workflow-run/failed',
|
||||
WorkflowRunFailedUserError = 'workflow-run/failed/user-error',
|
||||
WorkflowRunFailedSystemError = 'workflow-run/failed/system-error',
|
||||
WorkflowRunStopped = 'workflow-run/stopped',
|
||||
WorkflowRunThrottled = 'workflow-run/throttled',
|
||||
WorkflowRunFailedToEnqueue = 'workflow-run/failed/to-enqueue',
|
||||
|
||||
+3
@@ -12,6 +12,7 @@ export enum LogicFunctionExceptionCode {
|
||||
LOGIC_FUNCTION_CODE_UNCHANGED = 'LOGIC_FUNCTION_CODE_UNCHANGED',
|
||||
LOGIC_FUNCTION_EXECUTION_LIMIT_REACHED = 'LOGIC_FUNCTION_EXECUTION_LIMIT_REACHED',
|
||||
LOGIC_FUNCTION_CREATE_FAILED = 'LOGIC_FUNCTION_CREATE_FAILED',
|
||||
LOGIC_FUNCTION_BUILD_USER_ERROR = 'LOGIC_FUNCTION_BUILD_USER_ERROR',
|
||||
LOGIC_FUNCTION_EXECUTION_TIMEOUT = 'LOGIC_FUNCTION_EXECUTION_TIMEOUT',
|
||||
LOGIC_FUNCTION_DISABLED = 'LOGIC_FUNCTION_DISABLED',
|
||||
LOGIC_FUNCTION_INVALID_SEED_PROJECT = 'LOGIC_FUNCTION_INVALID_SEED_PROJECT',
|
||||
@@ -35,6 +36,8 @@ const getLogicFunctionExceptionUserFriendlyMessage = (
|
||||
return msg`Function execution limit reached.`;
|
||||
case LogicFunctionExceptionCode.LOGIC_FUNCTION_CREATE_FAILED:
|
||||
return msg`Failed to create function.`;
|
||||
case LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR:
|
||||
return msg`Function build failed due to invalid code or dependencies.`;
|
||||
case LogicFunctionExceptionCode.LOGIC_FUNCTION_EXECUTION_TIMEOUT:
|
||||
return msg`Function execution timed out.`;
|
||||
case LogicFunctionExceptionCode.LOGIC_FUNCTION_DISABLED:
|
||||
|
||||
+3
@@ -1,6 +1,9 @@
|
||||
import { type WorkflowFailureReason } from 'src/modules/workflow/workflow-executor/types/workflow-failure-reason.type';
|
||||
|
||||
export type WorkflowActionOutput = {
|
||||
result?: object;
|
||||
error?: string;
|
||||
failureReason?: WorkflowFailureReason;
|
||||
pendingEvent?: boolean;
|
||||
shouldEndWorkflowRun?: boolean;
|
||||
shouldRemainRunning?: boolean;
|
||||
|
||||
+1
@@ -0,0 +1 @@
|
||||
export type WorkflowFailureReason = 'USER_ERROR' | 'SYSTEM_ERROR';
|
||||
+28
-9
@@ -5,6 +5,10 @@ import { resolveInput } from 'twenty-shared/utils';
|
||||
import { type WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
|
||||
|
||||
import { LogicFunctionExecutorService } from 'src/engine/core-modules/logic-function/logic-function-executor/logic-function-executor.service';
|
||||
import {
|
||||
LogicFunctionException,
|
||||
LogicFunctionExceptionCode,
|
||||
} from 'src/engine/metadata-modules/logic-function/logic-function.exception';
|
||||
import {
|
||||
WorkflowStepExecutorException,
|
||||
WorkflowStepExecutorExceptionCode,
|
||||
@@ -46,16 +50,31 @@ export class CodeWorkflowAction implements WorkflowAction {
|
||||
|
||||
const { workspaceId } = runInfo;
|
||||
|
||||
const result = await this.logicFunctionExecutorService.execute({
|
||||
logicFunctionId: workflowActionInput.logicFunctionId,
|
||||
workspaceId,
|
||||
payload: workflowActionInput.logicFunctionInput,
|
||||
});
|
||||
try {
|
||||
const result = await this.logicFunctionExecutorService.execute({
|
||||
logicFunctionId: workflowActionInput.logicFunctionId,
|
||||
workspaceId,
|
||||
payload: workflowActionInput.logicFunctionInput,
|
||||
});
|
||||
|
||||
if (result.error) {
|
||||
return { error: result.error.errorMessage };
|
||||
if (result.error) {
|
||||
return { error: result.error.errorMessage };
|
||||
}
|
||||
|
||||
return { result: result.data || {} };
|
||||
} catch (error) {
|
||||
if (
|
||||
error instanceof LogicFunctionException &&
|
||||
error.code ===
|
||||
LogicFunctionExceptionCode.LOGIC_FUNCTION_BUILD_USER_ERROR
|
||||
) {
|
||||
throw new WorkflowStepExecutorException(
|
||||
error.message,
|
||||
WorkflowStepExecutorExceptionCode.INVALID_STEP_INPUT,
|
||||
);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
return { result: result.data || {} };
|
||||
}
|
||||
}
|
||||
|
||||
+12
-2
@@ -9,6 +9,10 @@ import { HttpTool } from 'src/engine/core-modules/tool/tools/http-tool/http-tool
|
||||
import { SendEmailTool } from 'src/engine/core-modules/tool/tools/email-tool/send-email-tool';
|
||||
import { type ToolInput } from 'src/engine/core-modules/tool/types/tool-input.type';
|
||||
import { type Tool } from 'src/engine/core-modules/tool/types/tool.type';
|
||||
import {
|
||||
WorkflowStepExecutorException,
|
||||
WorkflowStepExecutorExceptionCode,
|
||||
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
|
||||
import { type WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
|
||||
import { type WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
|
||||
import { type WorkflowSendEmailActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/types/workflow-send-email-action-input.type';
|
||||
@@ -39,13 +43,19 @@ export class ToolExecutorWorkflowAction implements WorkflowAction {
|
||||
const step = steps.find((step) => step.id === currentStepId);
|
||||
|
||||
if (!step) {
|
||||
throw new Error('Step not found');
|
||||
throw new WorkflowStepExecutorException(
|
||||
'Step not found',
|
||||
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
const tool = this.toolsByActionType.get(step.type);
|
||||
|
||||
if (!tool) {
|
||||
throw new Error(`No tool found for workflow action type: ${step.type}`);
|
||||
throw new WorkflowStepExecutorException(
|
||||
`No tool found for workflow action type: ${step.type}`,
|
||||
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
|
||||
);
|
||||
}
|
||||
|
||||
let toolInput = step.settings.input;
|
||||
|
||||
+29
-7
@@ -28,6 +28,7 @@ import {
|
||||
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
|
||||
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
|
||||
import { type WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
|
||||
import { type WorkflowFailureReason } from 'src/modules/workflow/workflow-executor/types/workflow-failure-reason.type';
|
||||
import {
|
||||
type WorkflowBranchExecutorInput,
|
||||
type WorkflowExecutorInput,
|
||||
@@ -68,9 +69,9 @@ export class WorkflowExecutorWorkspaceService {
|
||||
shouldComputeWorkflowRunStatus = true,
|
||||
executedStepsCount = 0,
|
||||
}: WorkflowExecutorInput) {
|
||||
await Promise.all(
|
||||
const reasons = await Promise.all(
|
||||
stepIds.map(async (stepIdToExecute) => {
|
||||
await this.executeFromStep({
|
||||
return this.executeFromStep({
|
||||
stepId: stepIdToExecute,
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
@@ -80,9 +81,16 @@ export class WorkflowExecutorWorkspaceService {
|
||||
);
|
||||
|
||||
if (shouldComputeWorkflowRunStatus) {
|
||||
const failureReason = reasons.some(
|
||||
(reason) => reason === 'SYSTEM_ERROR',
|
||||
)
|
||||
? 'SYSTEM_ERROR'
|
||||
: reasons.find((reason) => reason === 'USER_ERROR') ?? undefined;
|
||||
|
||||
await this.computeWorkflowRunStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
failureReason,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -92,7 +100,7 @@ export class WorkflowExecutorWorkspaceService {
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
executedStepsCount,
|
||||
}: WorkflowBranchExecutorInput) {
|
||||
}: WorkflowBranchExecutorInput): Promise<WorkflowFailureReason | undefined> {
|
||||
const workflowRun =
|
||||
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
@@ -113,7 +121,7 @@ export class WorkflowExecutorWorkspaceService {
|
||||
error: 'Step not found',
|
||||
});
|
||||
|
||||
return;
|
||||
return 'SYSTEM_ERROR';
|
||||
}
|
||||
|
||||
let actionOutput: WorkflowActionOutput;
|
||||
@@ -165,7 +173,7 @@ export class WorkflowExecutorWorkspaceService {
|
||||
shouldSkipStepExecution: true,
|
||||
};
|
||||
} else {
|
||||
return;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const isError =
|
||||
@@ -183,7 +191,11 @@ export class WorkflowExecutorWorkspaceService {
|
||||
});
|
||||
|
||||
if (!shouldProcessNextSteps) {
|
||||
return;
|
||||
if (isError) {
|
||||
return actionOutput.failureReason ?? 'SYSTEM_ERROR';
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const shouldRunAnotherJob =
|
||||
@@ -196,7 +208,7 @@ export class WorkflowExecutorWorkspaceService {
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
return;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const { nextStepIdsToExecute, nextStepIdsToSkip, nextStepIdsToFailSafely } =
|
||||
@@ -225,6 +237,8 @@ export class WorkflowExecutorWorkspaceService {
|
||||
executedStepsCount: (executedStepsCount ?? 0) + 1,
|
||||
});
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async getNextStepIdsToExecute({
|
||||
@@ -300,9 +314,11 @@ export class WorkflowExecutorWorkspaceService {
|
||||
private async computeWorkflowRunStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
failureReason,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
failureReason?: WorkflowFailureReason;
|
||||
}) {
|
||||
const workflowRun =
|
||||
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
|
||||
@@ -332,6 +348,7 @@ export class WorkflowExecutorWorkspaceService {
|
||||
workspaceId,
|
||||
status: WorkflowRunStatus.FAILED,
|
||||
error: 'WorkflowRun failed',
|
||||
failureReason,
|
||||
});
|
||||
|
||||
return;
|
||||
@@ -502,8 +519,13 @@ export class WorkflowExecutorWorkspaceService {
|
||||
});
|
||||
}
|
||||
|
||||
const failureReason: WorkflowFailureReason = isUserError
|
||||
? 'USER_ERROR'
|
||||
: 'SYSTEM_ERROR';
|
||||
|
||||
return {
|
||||
error: error.message ?? 'Execution result error, no data or error',
|
||||
failureReason,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
+17
-4
@@ -187,11 +187,13 @@ export class WorkflowRunWorkspaceService {
|
||||
workspaceId,
|
||||
status,
|
||||
error,
|
||||
failureReason,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
status: Extract<WorkflowRunStatus, 'COMPLETED' | 'FAILED' | 'STOPPED'>;
|
||||
error?: string;
|
||||
failureReason?: 'USER_ERROR' | 'SYSTEM_ERROR';
|
||||
}) {
|
||||
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
@@ -222,8 +224,9 @@ export class WorkflowRunWorkspaceService {
|
||||
? MetricsKeys.WorkflowRunCompleted
|
||||
: status === WorkflowRunStatus.STOPPED
|
||||
? MetricsKeys.WorkflowRunStopped
|
||||
: MetricsKeys.WorkflowRunFailed,
|
||||
: this.getMetricKeyFromFailureReason(failureReason),
|
||||
eventId: workflowRunId,
|
||||
attributes: { workspace_id: workspaceId },
|
||||
});
|
||||
}
|
||||
|
||||
@@ -250,7 +253,7 @@ export class WorkflowRunWorkspaceService {
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
[stepId]: {
|
||||
...(workflowRunToUpdate.state?.stepInfos[stepId] || {}),
|
||||
...workflowRunToUpdate.state?.stepInfos[stepId],
|
||||
result: stepInfo?.result,
|
||||
error: stepInfo?.error,
|
||||
status: stepInfo.status,
|
||||
@@ -283,7 +286,7 @@ export class WorkflowRunWorkspaceService {
|
||||
|
||||
for (const [stepId, info] of Object.entries(stepInfos)) {
|
||||
mergedStepInfos[stepId] = {
|
||||
...(existingStepInfos[stepId] || {}),
|
||||
...existingStepInfos[stepId],
|
||||
...info,
|
||||
};
|
||||
}
|
||||
@@ -335,7 +338,7 @@ export class WorkflowRunWorkspaceService {
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
flow: {
|
||||
...(workflowRunToUpdate.state?.flow ?? {}),
|
||||
...workflowRunToUpdate.state?.flow,
|
||||
steps: updatedSteps,
|
||||
},
|
||||
},
|
||||
@@ -492,4 +495,14 @@ export class WorkflowRunWorkspaceService {
|
||||
};
|
||||
}, {});
|
||||
}
|
||||
|
||||
private getMetricKeyFromFailureReason(
|
||||
failureReason: 'USER_ERROR' | 'SYSTEM_ERROR' | undefined,
|
||||
): MetricsKeys {
|
||||
if (failureReason === 'USER_ERROR') {
|
||||
return MetricsKeys.WorkflowRunFailedUserError;
|
||||
}
|
||||
|
||||
return MetricsKeys.WorkflowRunFailedSystemError;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user