Compare commits

...

8 Commits

Author SHA1 Message Date
Abdul Rahman 9e55fc5f6c Update enqueueLogicFunctionExecution to use isNonEmptyString for universalIdentifier validation and add test for empty universalIdentifier case 2026-05-07 15:09:57 +05:30
Abdul Rahman 8a5d4cf812 Refactor jobId assignment in AppLogicFunctionService for improved readability 2026-05-07 15:09:41 +05:30
Abdul Rahman 204986f99d Refactor AppLogicFunctionModule imports to include TokenModule and WorkspaceCacheStorageModule 2026-05-07 15:00:39 +05:30
Abdul Rahman b5d4d9af63 Merge branch 'main' into logic-function-enqueue-execution 2026-05-07 14:04:31 +05:30
Abdul Rahman 374d64fc61 Add documentation for enqueueLogicFunctionExecution usage
- Documented the `enqueueLogicFunctionExecution` function in the logic-functions.mdx file.
- Provided an example of how to use the function within a logic function handler.
- Clarified the requirement to pass either `name` or `universalIdentifier` for the function to work correctly.
2026-05-07 07:04:12 +05:30
Abdul Rahman e31fa038fe Add enqueueLogicFunctionExecution for handling logic function executions
- Introduced `enqueueLogicFunctionExecution` function to enqueue logic function executions with either a name or a universal identifier.
- Implemented error handling for missing environment variables and validation for input parameters.
- Added tests to verify the functionality and error cases for the new function.
- Updated the logic-function index to export the new function and its types.
2026-05-07 06:35:33 +05:30
Abdul Rahman 957ac1ff94 Add AppLogicFunction module and controller for enqueueing logic function executions
- Introduced `AppLogicFunctionModule`, `AppLogicFunctionController`, and `AppLogicFunctionService` to handle enqueueing logic function executions.
- Added DTO `EnqueueLogicFunctionExecutionDto` for request validation.
- Integrated the new module into the existing `LogicFunctionModule`.
- Implemented guards and validation for secure and structured request handling.
- Enhanced message queue interaction for processing logic function jobs.
2026-05-07 06:34:09 +05:30
Abdul Rahman 70be1712ea Refactor message queue driver methods to return job IDs
Updated the `add` method in `BullMQDriver` and `SyncDriver` to return a job ID instead of void. Adjusted the `MessageQueueDriver` interface accordingly. This change enhances the ability to track jobs by their IDs across the message queue system.
2026-05-06 23:12:54 +05:30
13 changed files with 437 additions and 8 deletions
@@ -76,6 +76,28 @@ yarn twenty logs
```
</Note>
#### enqueueLogicFunctionExecution
Inside your logic function handler — regardless of trigger (HTTP route, cron, database event, tool, workflow action, or install hook) — you can enqueue **another** function that belongs to the **same application** on the worker queue. Import it from `twenty-sdk/logic-function`:
```ts
import {
enqueueLogicFunctionExecution,
type RoutePayload,
} from 'twenty-sdk/logic-function';
const handler = async (event: RoutePayload) => {
const { jobId, status } = await enqueueLogicFunctionExecution({
universalIdentifier: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
payload: { fromWebhook: true },
});
return { accepted: true, jobId, status };
};
```
Pass **exactly one** of `name` or `universalIdentifier`.
#### Route trigger payload
When a route trigger invokes your logic function, it receives a `RoutePayload` object that follows the
@@ -0,0 +1,111 @@
import {
afterEach,
beforeEach,
describe,
expect,
it,
vi,
type MockInstance,
} from 'vitest';
import { enqueueLogicFunctionExecution } from '@/sdk/logic-function/enqueue-logic-function-execution';
describe('enqueueLogicFunctionExecution', () => {
let fetchSpy: MockInstance<typeof fetch>;
beforeEach(() => {
process.env.TWENTY_API_URL = 'https://api.test';
process.env.TWENTY_APP_ACCESS_TOKEN = 'app-token';
fetchSpy = vi.spyOn(globalThis, 'fetch');
});
afterEach(() => {
delete process.env.TWENTY_API_URL;
delete process.env.TWENTY_APP_ACCESS_TOKEN;
fetchSpy.mockRestore();
});
it('POSTs to /app/logic-functions/enqueue with universalIdentifier', async () => {
fetchSpy.mockResolvedValue(
new Response(JSON.stringify({ jobId: 'job-1', status: 'queued' }), {
status: 202,
}),
);
const result = await enqueueLogicFunctionExecution({
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
payload: { foo: 'bar' },
});
expect(result).toEqual({ jobId: 'job-1', status: 'queued' });
expect(fetchSpy).toHaveBeenCalledWith(
'https://api.test/app/logic-functions/enqueue',
expect.objectContaining({
method: 'POST',
body: JSON.stringify({
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
payload: { foo: 'bar' },
}),
headers: expect.objectContaining({
Authorization: 'Bearer app-token',
}),
}),
);
});
it('POSTs with name when provided', async () => {
fetchSpy.mockResolvedValue(
new Response(JSON.stringify({ jobId: 'job-2', status: 'queued' }), {
status: 202,
}),
);
await enqueueLogicFunctionExecution({
name: 'my-fn',
payload: {},
});
expect(fetchSpy).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({
body: JSON.stringify({ name: 'my-fn', payload: {} }),
}),
);
});
it('throws when env vars are missing', async () => {
delete process.env.TWENTY_API_URL;
await expect(
enqueueLogicFunctionExecution({
name: 'x',
payload: {},
}),
).rejects.toThrow(/TWENTY_API_URL/);
});
it('throws when neither name nor universalIdentifier is provided', async () => {
await expect(
enqueueLogicFunctionExecution({ payload: {} }),
).rejects.toThrow(/exactly one of name or universalIdentifier/);
});
it('throws when both name and universalIdentifier are provided', async () => {
await expect(
enqueueLogicFunctionExecution({
name: 'a',
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
payload: {},
}),
).rejects.toThrow(/exactly one of name or universalIdentifier/);
});
it('treats empty universalIdentifier as missing', async () => {
await expect(
enqueueLogicFunctionExecution({
universalIdentifier: '',
payload: {},
}),
).rejects.toThrow(/exactly one of name or universalIdentifier/);
});
});
@@ -0,0 +1,69 @@
import { isNonEmptyString } from '@sniptt/guards';
import {
DEFAULT_API_URL_NAME,
DEFAULT_APP_ACCESS_TOKEN_NAME,
} from 'twenty-shared/application';
const ENQUEUE_LOGIC_FUNCTION_TIMEOUT_MS = 10_000;
export type EnqueueLogicFunctionExecutionParams = {
name?: string;
universalIdentifier?: string;
payload: Record<string, unknown>;
};
export type EnqueueLogicFunctionExecutionResult = {
jobId: string;
status: 'queued';
};
export const enqueueLogicFunctionExecution = async (
params: EnqueueLogicFunctionExecutionParams,
): Promise<EnqueueLogicFunctionExecutionResult> => {
const apiUrl = process.env[DEFAULT_API_URL_NAME];
const token = process.env[DEFAULT_APP_ACCESS_TOKEN_NAME];
if (!apiUrl || !token) {
throw new Error(
'enqueueLogicFunctionExecution requires TWENTY_API_URL and TWENTY_APP_ACCESS_TOKEN',
);
}
const hasUniversalIdentifier = isNonEmptyString(params.universalIdentifier);
const hasName = isNonEmptyString(params.name);
if (hasUniversalIdentifier === hasName) {
throw new Error(
'enqueueLogicFunctionExecution: provide exactly one of name or universalIdentifier',
);
}
const response = await fetch(
`${apiUrl.replace(/\/$/, '')}/app/logic-functions/enqueue`,
{
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
...(hasName ? { name: params.name } : {}),
...(hasUniversalIdentifier
? { universalIdentifier: params.universalIdentifier }
: {}),
payload: params.payload,
}),
signal: AbortSignal.timeout(ENQUEUE_LOGIC_FUNCTION_TIMEOUT_MS),
},
);
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(
`enqueueLogicFunctionExecution failed: ${response.status} ${response.statusText} ${body}`,
);
}
return response.json() as Promise<EnqueueLogicFunctionExecutionResult>;
};
@@ -38,9 +38,15 @@ export type { RoutePayload } from '@/sdk/define/logic-functions/triggers/route-p
export type { InputJsonSchema } from 'twenty-shared/logic-function';
export {
enqueueLogicFunctionExecution,
type EnqueueLogicFunctionExecutionParams,
type EnqueueLogicFunctionExecutionResult,
} from '@/sdk/logic-function/enqueue-logic-function-execution';
export { AppConnectionAuthFailedError } from '@/sdk/logic-function/connections/errors/app-connection-auth-failed.error';
export { findConnectionForRequest } from '@/sdk/logic-function/connections/find-connection-for-request';
export { getConnection } from '@/sdk/logic-function/connections/get-connection';
export { listConnections } from '@/sdk/logic-function/connections/list-connections';
export type { ListConnectionsFilter } from '@/sdk/logic-function/connections/list-connections';
export { findConnectionForRequest } from '@/sdk/logic-function/connections/find-connection-for-request';
export { AppConnectionAuthFailedError } from '@/sdk/logic-function/connections/errors/app-connection-auth-failed.error';
export type { AppConnection } from '@/sdk/logic-function/connections/types/app-connection.type';
@@ -0,0 +1,49 @@
import {
Body,
Controller,
ForbiddenException,
HttpCode,
HttpStatus,
Post,
Req,
UseGuards,
UsePipes,
ValidationPipe,
} from '@nestjs/common';
import { Request } from 'express';
import { isDefined } from 'twenty-shared/utils';
import { AppLogicFunctionService } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.service';
import { EnqueueLogicFunctionExecutionDto } from 'src/engine/core-modules/logic-function/app-logic-function/dtos/enqueue-logic-function-execution.dto';
import { JwtAuthGuard } from 'src/engine/guards/jwt-auth.guard';
import { NoPermissionGuard } from 'src/engine/guards/no-permission.guard';
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
@Controller('app/logic-functions')
@UseGuards(JwtAuthGuard, WorkspaceAuthGuard, NoPermissionGuard)
export class AppLogicFunctionController {
constructor(
private readonly appLogicFunctionService: AppLogicFunctionService,
) {}
@Post('enqueue')
@HttpCode(HttpStatus.ACCEPTED)
@UsePipes(new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }))
async enqueue(
@Req() request: Request,
@Body() body: EnqueueLogicFunctionExecutionDto,
) {
if (!isDefined(request.application) || !isDefined(request.workspace)) {
throw new ForbiddenException(
'Logic function enqueue requires an APPLICATION_ACCESS token.',
);
}
return this.appLogicFunctionService.enqueueExecution({
workspaceId: request.workspace.id,
applicationId: request.application.id,
dto: body,
});
}
}
@@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { TokenModule } from 'src/engine/core-modules/auth/token/token.module';
import { AppLogicFunctionController } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.controller';
import { AppLogicFunctionService } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.service';
import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module';
import { WorkspaceCacheModule } from 'src/engine/workspace-cache/workspace-cache.module';
@Module({
imports: [TokenModule, WorkspaceCacheModule, WorkspaceCacheStorageModule],
controllers: [AppLogicFunctionController],
providers: [AppLogicFunctionService],
})
export class AppLogicFunctionModule {}
@@ -0,0 +1,134 @@
import {
BadRequestException,
ConflictException,
Injectable,
InternalServerErrorException,
NotFoundException,
} from '@nestjs/common';
import { isNonEmptyString } from '@sniptt/guards';
import { isDefined } from 'twenty-shared/utils';
import { type EnqueueLogicFunctionExecutionDto } from 'src/engine/core-modules/logic-function/app-logic-function/dtos/enqueue-logic-function-execution.dto';
import {
LogicFunctionTriggerJob,
type LogicFunctionTriggerJobData,
} from 'src/engine/core-modules/logic-function/logic-function-trigger/jobs/logic-function-trigger.job';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { type FlatLogicFunction } from 'src/engine/metadata-modules/logic-function/types/flat-logic-function.type';
import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service';
export type EnqueueLogicFunctionExecutionResult = {
jobId: string;
status: 'queued';
};
@Injectable()
export class AppLogicFunctionService {
constructor(
private readonly workspaceCacheService: WorkspaceCacheService,
@InjectMessageQueue(MessageQueue.logicFunctionQueue)
private readonly logicFunctionMessageQueueService: MessageQueueService,
) {}
async enqueueExecution(params: {
workspaceId: string;
applicationId: string;
dto: EnqueueLogicFunctionExecutionDto;
}): Promise<EnqueueLogicFunctionExecutionResult> {
const { workspaceId, applicationId, dto } = params;
const { name, universalIdentifier, payload } = dto;
const hasUniversalIdentifier = isDefined(universalIdentifier);
const hasName = isNonEmptyString(name);
if (hasUniversalIdentifier === hasName) {
throw new BadRequestException(
'Provide exactly one of name or universalIdentifier',
);
}
const logicFunction = await this.findLogicFunctionForApplicationOrThrow({
workspaceId,
applicationId,
name: hasName ? name : undefined,
universalIdentifier: hasUniversalIdentifier
? universalIdentifier
: undefined,
});
const jobId = await this.logicFunctionMessageQueueService.add<
LogicFunctionTriggerJobData[]
>(
LogicFunctionTriggerJob.name,
[
{
logicFunctionId: logicFunction.id,
workspaceId,
payload,
},
],
{ retryLimit: 3 },
);
if (!isDefined(jobId)) {
throw new InternalServerErrorException(
'Failed to enqueue logic function execution',
);
}
return { jobId, status: 'queued' };
}
private async findLogicFunctionForApplicationOrThrow(params: {
workspaceId: string;
applicationId: string;
name?: string;
universalIdentifier?: string;
}): Promise<FlatLogicFunction> {
const { workspaceId, applicationId, name, universalIdentifier } = params;
const { flatLogicFunctionMaps } =
await this.workspaceCacheService.getOrRecompute(workspaceId, [
'flatLogicFunctionMaps',
]);
if (isDefined(universalIdentifier)) {
const logicFunction =
flatLogicFunctionMaps.byUniversalIdentifier[universalIdentifier];
if (
!isDefined(logicFunction) ||
logicFunction.applicationId !== applicationId ||
isDefined(logicFunction.deletedAt)
) {
throw new NotFoundException('Logic function not found');
}
return logicFunction;
}
const matches = Object.values(flatLogicFunctionMaps.byUniversalIdentifier)
.filter(isDefined)
.filter(
(logicFunction) =>
logicFunction.applicationId === applicationId &&
!isDefined(logicFunction.deletedAt) &&
logicFunction.name === name,
);
if (matches.length === 0) {
throw new NotFoundException('Logic function not found');
}
if (matches.length > 1) {
throw new ConflictException(
'Multiple logic functions match this name; use universalIdentifier',
);
}
return matches[0];
}
}
@@ -0,0 +1,14 @@
import { IsObject, IsOptional, IsString, IsUUID } from 'class-validator';
export class EnqueueLogicFunctionExecutionDto {
@IsOptional()
@IsString()
name?: string;
@IsOptional()
@IsUUID()
universalIdentifier?: string;
@IsObject()
payload!: Record<string, unknown>;
}
@@ -1,6 +1,7 @@
import { type DynamicModule, Global, Module } from '@nestjs/common';
import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module';
import { AppLogicFunctionModule } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.module';
import { LogicFunctionDriverFactory } from 'src/engine/core-modules/logic-function/logic-function-drivers/logic-function-driver.factory';
import { LogicFunctionResourceModule } from 'src/engine/core-modules/logic-function/logic-function-resource/logic-function-resource.module';
import { LogicFunctionTriggerModule } from 'src/engine/core-modules/logic-function/logic-function-trigger/logic-function-trigger.module';
@@ -17,6 +18,7 @@ export class LogicFunctionModule {
module: LogicFunctionModule,
imports: [
TwentyConfigModule,
AppLogicFunctionModule,
CacheLockModule,
LogicFunctionResourceModule,
LogicFunctionTriggerModule,
@@ -222,7 +222,7 @@ export class BullMQDriver
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
): Promise<string | undefined> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
@@ -257,6 +257,8 @@ export class BullMQDriver
delay: options?.delay,
};
await this.queueMap[queueName].add(jobName, data, queueOptions);
const job = await this.queueMap[queueName].add(jobName, data, queueOptions);
return job.id;
}
}
@@ -13,7 +13,7 @@ export interface MessageQueueDriver {
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void>;
): Promise<string | undefined>;
// @ts-expect-error legacy noImplicitAny
work<T extends MessageQueueJobData>(
queueName: MessageQueue,
@@ -1,5 +1,7 @@
import { Logger } from '@nestjs/common';
import { v4 } from 'uuid';
import { type MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import {
type MessageQueueJob,
@@ -20,8 +22,12 @@ export class SyncDriver implements MessageQueueDriver {
queueName: MessageQueue,
jobName: string,
data: T,
): Promise<void> {
await this.processJob(queueName, { id: '', name: jobName, data });
): Promise<string | undefined> {
const jobId = v4();
await this.processJob(queueName, { id: jobId, name: jobName, data });
return jobId;
}
async addCron<T extends MessageQueueJobData | undefined>({
@@ -31,7 +31,7 @@ export class MessageQueueService {
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
): Promise<string | undefined> {
return this.driver.add(this.queueName, jobName, data, options);
}