Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e55fc5f6c | |||
| 8a5d4cf812 | |||
| 204986f99d | |||
| b5d4d9af63 | |||
| 374d64fc61 | |||
| e31fa038fe | |||
| 957ac1ff94 | |||
| 70be1712ea |
@@ -76,6 +76,28 @@ yarn twenty logs
|
||||
```
|
||||
</Note>
|
||||
|
||||
#### enqueueLogicFunctionExecution
|
||||
|
||||
Inside your logic function handler — regardless of trigger (HTTP route, cron, database event, tool, workflow action, or install hook) — you can enqueue **another** function that belongs to the **same application** on the worker queue. Import it from `twenty-sdk/logic-function`:
|
||||
|
||||
```ts
|
||||
import {
|
||||
enqueueLogicFunctionExecution,
|
||||
type RoutePayload,
|
||||
} from 'twenty-sdk/logic-function';
|
||||
|
||||
const handler = async (event: RoutePayload) => {
|
||||
const { jobId, status } = await enqueueLogicFunctionExecution({
|
||||
universalIdentifier: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
|
||||
payload: { fromWebhook: true },
|
||||
});
|
||||
|
||||
return { accepted: true, jobId, status };
|
||||
};
|
||||
```
|
||||
|
||||
Pass **exactly one** of `name` or `universalIdentifier`.
|
||||
|
||||
#### Route trigger payload
|
||||
|
||||
When a route trigger invokes your logic function, it receives a `RoutePayload` object that follows the
|
||||
|
||||
+111
@@ -0,0 +1,111 @@
|
||||
import {
|
||||
afterEach,
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi,
|
||||
type MockInstance,
|
||||
} from 'vitest';
|
||||
|
||||
import { enqueueLogicFunctionExecution } from '@/sdk/logic-function/enqueue-logic-function-execution';
|
||||
|
||||
describe('enqueueLogicFunctionExecution', () => {
|
||||
let fetchSpy: MockInstance<typeof fetch>;
|
||||
|
||||
beforeEach(() => {
|
||||
process.env.TWENTY_API_URL = 'https://api.test';
|
||||
process.env.TWENTY_APP_ACCESS_TOKEN = 'app-token';
|
||||
fetchSpy = vi.spyOn(globalThis, 'fetch');
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
delete process.env.TWENTY_API_URL;
|
||||
delete process.env.TWENTY_APP_ACCESS_TOKEN;
|
||||
fetchSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('POSTs to /app/logic-functions/enqueue with universalIdentifier', async () => {
|
||||
fetchSpy.mockResolvedValue(
|
||||
new Response(JSON.stringify({ jobId: 'job-1', status: 'queued' }), {
|
||||
status: 202,
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await enqueueLogicFunctionExecution({
|
||||
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
|
||||
payload: { foo: 'bar' },
|
||||
});
|
||||
|
||||
expect(result).toEqual({ jobId: 'job-1', status: 'queued' });
|
||||
expect(fetchSpy).toHaveBeenCalledWith(
|
||||
'https://api.test/app/logic-functions/enqueue',
|
||||
expect.objectContaining({
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
|
||||
payload: { foo: 'bar' },
|
||||
}),
|
||||
headers: expect.objectContaining({
|
||||
Authorization: 'Bearer app-token',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('POSTs with name when provided', async () => {
|
||||
fetchSpy.mockResolvedValue(
|
||||
new Response(JSON.stringify({ jobId: 'job-2', status: 'queued' }), {
|
||||
status: 202,
|
||||
}),
|
||||
);
|
||||
|
||||
await enqueueLogicFunctionExecution({
|
||||
name: 'my-fn',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
expect(fetchSpy).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
expect.objectContaining({
|
||||
body: JSON.stringify({ name: 'my-fn', payload: {} }),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('throws when env vars are missing', async () => {
|
||||
delete process.env.TWENTY_API_URL;
|
||||
|
||||
await expect(
|
||||
enqueueLogicFunctionExecution({
|
||||
name: 'x',
|
||||
payload: {},
|
||||
}),
|
||||
).rejects.toThrow(/TWENTY_API_URL/);
|
||||
});
|
||||
|
||||
it('throws when neither name nor universalIdentifier is provided', async () => {
|
||||
await expect(
|
||||
enqueueLogicFunctionExecution({ payload: {} }),
|
||||
).rejects.toThrow(/exactly one of name or universalIdentifier/);
|
||||
});
|
||||
|
||||
it('throws when both name and universalIdentifier are provided', async () => {
|
||||
await expect(
|
||||
enqueueLogicFunctionExecution({
|
||||
name: 'a',
|
||||
universalIdentifier: '550e8400-e29b-41d4-a716-446655440000',
|
||||
payload: {},
|
||||
}),
|
||||
).rejects.toThrow(/exactly one of name or universalIdentifier/);
|
||||
});
|
||||
|
||||
it('treats empty universalIdentifier as missing', async () => {
|
||||
await expect(
|
||||
enqueueLogicFunctionExecution({
|
||||
universalIdentifier: '',
|
||||
payload: {},
|
||||
}),
|
||||
).rejects.toThrow(/exactly one of name or universalIdentifier/);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,69 @@
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
import {
|
||||
DEFAULT_API_URL_NAME,
|
||||
DEFAULT_APP_ACCESS_TOKEN_NAME,
|
||||
} from 'twenty-shared/application';
|
||||
|
||||
const ENQUEUE_LOGIC_FUNCTION_TIMEOUT_MS = 10_000;
|
||||
|
||||
export type EnqueueLogicFunctionExecutionParams = {
|
||||
name?: string;
|
||||
universalIdentifier?: string;
|
||||
payload: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type EnqueueLogicFunctionExecutionResult = {
|
||||
jobId: string;
|
||||
status: 'queued';
|
||||
};
|
||||
|
||||
export const enqueueLogicFunctionExecution = async (
|
||||
params: EnqueueLogicFunctionExecutionParams,
|
||||
): Promise<EnqueueLogicFunctionExecutionResult> => {
|
||||
const apiUrl = process.env[DEFAULT_API_URL_NAME];
|
||||
const token = process.env[DEFAULT_APP_ACCESS_TOKEN_NAME];
|
||||
|
||||
if (!apiUrl || !token) {
|
||||
throw new Error(
|
||||
'enqueueLogicFunctionExecution requires TWENTY_API_URL and TWENTY_APP_ACCESS_TOKEN',
|
||||
);
|
||||
}
|
||||
|
||||
const hasUniversalIdentifier = isNonEmptyString(params.universalIdentifier);
|
||||
const hasName = isNonEmptyString(params.name);
|
||||
|
||||
if (hasUniversalIdentifier === hasName) {
|
||||
throw new Error(
|
||||
'enqueueLogicFunctionExecution: provide exactly one of name or universalIdentifier',
|
||||
);
|
||||
}
|
||||
|
||||
const response = await fetch(
|
||||
`${apiUrl.replace(/\/$/, '')}/app/logic-functions/enqueue`,
|
||||
{
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
...(hasName ? { name: params.name } : {}),
|
||||
...(hasUniversalIdentifier
|
||||
? { universalIdentifier: params.universalIdentifier }
|
||||
: {}),
|
||||
payload: params.payload,
|
||||
}),
|
||||
signal: AbortSignal.timeout(ENQUEUE_LOGIC_FUNCTION_TIMEOUT_MS),
|
||||
},
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
const body = await response.text().catch(() => '');
|
||||
|
||||
throw new Error(
|
||||
`enqueueLogicFunctionExecution failed: ${response.status} ${response.statusText} ${body}`,
|
||||
);
|
||||
}
|
||||
|
||||
return response.json() as Promise<EnqueueLogicFunctionExecutionResult>;
|
||||
};
|
||||
@@ -38,9 +38,15 @@ export type { RoutePayload } from '@/sdk/define/logic-functions/triggers/route-p
|
||||
|
||||
export type { InputJsonSchema } from 'twenty-shared/logic-function';
|
||||
|
||||
export {
|
||||
enqueueLogicFunctionExecution,
|
||||
type EnqueueLogicFunctionExecutionParams,
|
||||
type EnqueueLogicFunctionExecutionResult,
|
||||
} from '@/sdk/logic-function/enqueue-logic-function-execution';
|
||||
|
||||
export { AppConnectionAuthFailedError } from '@/sdk/logic-function/connections/errors/app-connection-auth-failed.error';
|
||||
export { findConnectionForRequest } from '@/sdk/logic-function/connections/find-connection-for-request';
|
||||
export { getConnection } from '@/sdk/logic-function/connections/get-connection';
|
||||
export { listConnections } from '@/sdk/logic-function/connections/list-connections';
|
||||
export type { ListConnectionsFilter } from '@/sdk/logic-function/connections/list-connections';
|
||||
export { findConnectionForRequest } from '@/sdk/logic-function/connections/find-connection-for-request';
|
||||
export { AppConnectionAuthFailedError } from '@/sdk/logic-function/connections/errors/app-connection-auth-failed.error';
|
||||
export type { AppConnection } from '@/sdk/logic-function/connections/types/app-connection.type';
|
||||
|
||||
+49
@@ -0,0 +1,49 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
ForbiddenException,
|
||||
HttpCode,
|
||||
HttpStatus,
|
||||
Post,
|
||||
Req,
|
||||
UseGuards,
|
||||
UsePipes,
|
||||
ValidationPipe,
|
||||
} from '@nestjs/common';
|
||||
|
||||
import { Request } from 'express';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { AppLogicFunctionService } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.service';
|
||||
import { EnqueueLogicFunctionExecutionDto } from 'src/engine/core-modules/logic-function/app-logic-function/dtos/enqueue-logic-function-execution.dto';
|
||||
import { JwtAuthGuard } from 'src/engine/guards/jwt-auth.guard';
|
||||
import { NoPermissionGuard } from 'src/engine/guards/no-permission.guard';
|
||||
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
|
||||
|
||||
@Controller('app/logic-functions')
|
||||
@UseGuards(JwtAuthGuard, WorkspaceAuthGuard, NoPermissionGuard)
|
||||
export class AppLogicFunctionController {
|
||||
constructor(
|
||||
private readonly appLogicFunctionService: AppLogicFunctionService,
|
||||
) {}
|
||||
|
||||
@Post('enqueue')
|
||||
@HttpCode(HttpStatus.ACCEPTED)
|
||||
@UsePipes(new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }))
|
||||
async enqueue(
|
||||
@Req() request: Request,
|
||||
@Body() body: EnqueueLogicFunctionExecutionDto,
|
||||
) {
|
||||
if (!isDefined(request.application) || !isDefined(request.workspace)) {
|
||||
throw new ForbiddenException(
|
||||
'Logic function enqueue requires an APPLICATION_ACCESS token.',
|
||||
);
|
||||
}
|
||||
|
||||
return this.appLogicFunctionService.enqueueExecution({
|
||||
workspaceId: request.workspace.id,
|
||||
applicationId: request.application.id,
|
||||
dto: body,
|
||||
});
|
||||
}
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { TokenModule } from 'src/engine/core-modules/auth/token/token.module';
|
||||
import { AppLogicFunctionController } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.controller';
|
||||
import { AppLogicFunctionService } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.service';
|
||||
import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module';
|
||||
import { WorkspaceCacheModule } from 'src/engine/workspace-cache/workspace-cache.module';
|
||||
|
||||
@Module({
|
||||
imports: [TokenModule, WorkspaceCacheModule, WorkspaceCacheStorageModule],
|
||||
controllers: [AppLogicFunctionController],
|
||||
providers: [AppLogicFunctionService],
|
||||
})
|
||||
export class AppLogicFunctionModule {}
|
||||
+134
@@ -0,0 +1,134 @@
|
||||
import {
|
||||
BadRequestException,
|
||||
ConflictException,
|
||||
Injectable,
|
||||
InternalServerErrorException,
|
||||
NotFoundException,
|
||||
} from '@nestjs/common';
|
||||
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { type EnqueueLogicFunctionExecutionDto } from 'src/engine/core-modules/logic-function/app-logic-function/dtos/enqueue-logic-function-execution.dto';
|
||||
import {
|
||||
LogicFunctionTriggerJob,
|
||||
type LogicFunctionTriggerJobData,
|
||||
} from 'src/engine/core-modules/logic-function/logic-function-trigger/jobs/logic-function-trigger.job';
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import { type FlatLogicFunction } from 'src/engine/metadata-modules/logic-function/types/flat-logic-function.type';
|
||||
import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service';
|
||||
|
||||
export type EnqueueLogicFunctionExecutionResult = {
|
||||
jobId: string;
|
||||
status: 'queued';
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class AppLogicFunctionService {
|
||||
constructor(
|
||||
private readonly workspaceCacheService: WorkspaceCacheService,
|
||||
@InjectMessageQueue(MessageQueue.logicFunctionQueue)
|
||||
private readonly logicFunctionMessageQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
async enqueueExecution(params: {
|
||||
workspaceId: string;
|
||||
applicationId: string;
|
||||
dto: EnqueueLogicFunctionExecutionDto;
|
||||
}): Promise<EnqueueLogicFunctionExecutionResult> {
|
||||
const { workspaceId, applicationId, dto } = params;
|
||||
const { name, universalIdentifier, payload } = dto;
|
||||
|
||||
const hasUniversalIdentifier = isDefined(universalIdentifier);
|
||||
const hasName = isNonEmptyString(name);
|
||||
|
||||
if (hasUniversalIdentifier === hasName) {
|
||||
throw new BadRequestException(
|
||||
'Provide exactly one of name or universalIdentifier',
|
||||
);
|
||||
}
|
||||
|
||||
const logicFunction = await this.findLogicFunctionForApplicationOrThrow({
|
||||
workspaceId,
|
||||
applicationId,
|
||||
name: hasName ? name : undefined,
|
||||
universalIdentifier: hasUniversalIdentifier
|
||||
? universalIdentifier
|
||||
: undefined,
|
||||
});
|
||||
|
||||
const jobId = await this.logicFunctionMessageQueueService.add<
|
||||
LogicFunctionTriggerJobData[]
|
||||
>(
|
||||
LogicFunctionTriggerJob.name,
|
||||
[
|
||||
{
|
||||
logicFunctionId: logicFunction.id,
|
||||
workspaceId,
|
||||
payload,
|
||||
},
|
||||
],
|
||||
{ retryLimit: 3 },
|
||||
);
|
||||
|
||||
if (!isDefined(jobId)) {
|
||||
throw new InternalServerErrorException(
|
||||
'Failed to enqueue logic function execution',
|
||||
);
|
||||
}
|
||||
|
||||
return { jobId, status: 'queued' };
|
||||
}
|
||||
|
||||
private async findLogicFunctionForApplicationOrThrow(params: {
|
||||
workspaceId: string;
|
||||
applicationId: string;
|
||||
name?: string;
|
||||
universalIdentifier?: string;
|
||||
}): Promise<FlatLogicFunction> {
|
||||
const { workspaceId, applicationId, name, universalIdentifier } = params;
|
||||
|
||||
const { flatLogicFunctionMaps } =
|
||||
await this.workspaceCacheService.getOrRecompute(workspaceId, [
|
||||
'flatLogicFunctionMaps',
|
||||
]);
|
||||
|
||||
if (isDefined(universalIdentifier)) {
|
||||
const logicFunction =
|
||||
flatLogicFunctionMaps.byUniversalIdentifier[universalIdentifier];
|
||||
|
||||
if (
|
||||
!isDefined(logicFunction) ||
|
||||
logicFunction.applicationId !== applicationId ||
|
||||
isDefined(logicFunction.deletedAt)
|
||||
) {
|
||||
throw new NotFoundException('Logic function not found');
|
||||
}
|
||||
|
||||
return logicFunction;
|
||||
}
|
||||
|
||||
const matches = Object.values(flatLogicFunctionMaps.byUniversalIdentifier)
|
||||
.filter(isDefined)
|
||||
.filter(
|
||||
(logicFunction) =>
|
||||
logicFunction.applicationId === applicationId &&
|
||||
!isDefined(logicFunction.deletedAt) &&
|
||||
logicFunction.name === name,
|
||||
);
|
||||
|
||||
if (matches.length === 0) {
|
||||
throw new NotFoundException('Logic function not found');
|
||||
}
|
||||
|
||||
if (matches.length > 1) {
|
||||
throw new ConflictException(
|
||||
'Multiple logic functions match this name; use universalIdentifier',
|
||||
);
|
||||
}
|
||||
|
||||
return matches[0];
|
||||
}
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
import { IsObject, IsOptional, IsString, IsUUID } from 'class-validator';
|
||||
|
||||
export class EnqueueLogicFunctionExecutionDto {
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
name?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsUUID()
|
||||
universalIdentifier?: string;
|
||||
|
||||
@IsObject()
|
||||
payload!: Record<string, unknown>;
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import { type DynamicModule, Global, Module } from '@nestjs/common';
|
||||
|
||||
import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module';
|
||||
import { AppLogicFunctionModule } from 'src/engine/core-modules/logic-function/app-logic-function/app-logic-function.module';
|
||||
import { LogicFunctionDriverFactory } from 'src/engine/core-modules/logic-function/logic-function-drivers/logic-function-driver.factory';
|
||||
import { LogicFunctionResourceModule } from 'src/engine/core-modules/logic-function/logic-function-resource/logic-function-resource.module';
|
||||
import { LogicFunctionTriggerModule } from 'src/engine/core-modules/logic-function/logic-function-trigger/logic-function-trigger.module';
|
||||
@@ -17,6 +18,7 @@ export class LogicFunctionModule {
|
||||
module: LogicFunctionModule,
|
||||
imports: [
|
||||
TwentyConfigModule,
|
||||
AppLogicFunctionModule,
|
||||
CacheLockModule,
|
||||
LogicFunctionResourceModule,
|
||||
LogicFunctionTriggerModule,
|
||||
|
||||
+4
-2
@@ -222,7 +222,7 @@ export class BullMQDriver
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
): Promise<string | undefined> {
|
||||
if (!this.queueMap[queueName]) {
|
||||
throw new Error(
|
||||
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
|
||||
@@ -257,6 +257,8 @@ export class BullMQDriver
|
||||
delay: options?.delay,
|
||||
};
|
||||
|
||||
await this.queueMap[queueName].add(jobName, data, queueOptions);
|
||||
const job = await this.queueMap[queueName].add(jobName, data, queueOptions);
|
||||
|
||||
return job.id;
|
||||
}
|
||||
}
|
||||
|
||||
+1
-1
@@ -13,7 +13,7 @@ export interface MessageQueueDriver {
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void>;
|
||||
): Promise<string | undefined>;
|
||||
// @ts-expect-error legacy noImplicitAny
|
||||
work<T extends MessageQueueJobData>(
|
||||
queueName: MessageQueue,
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
import { v4 } from 'uuid';
|
||||
|
||||
import { type MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
|
||||
import {
|
||||
type MessageQueueJob,
|
||||
@@ -20,8 +22,12 @@ export class SyncDriver implements MessageQueueDriver {
|
||||
queueName: MessageQueue,
|
||||
jobName: string,
|
||||
data: T,
|
||||
): Promise<void> {
|
||||
await this.processJob(queueName, { id: '', name: jobName, data });
|
||||
): Promise<string | undefined> {
|
||||
const jobId = v4();
|
||||
|
||||
await this.processJob(queueName, { id: jobId, name: jobName, data });
|
||||
|
||||
return jobId;
|
||||
}
|
||||
|
||||
async addCron<T extends MessageQueueJobData | undefined>({
|
||||
|
||||
+1
-1
@@ -31,7 +31,7 @@ export class MessageQueueService {
|
||||
jobName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
): Promise<string | undefined> {
|
||||
return this.driver.add(this.queueName, jobName, data, options);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user