Compare commits

...

6 Commits

Author SHA1 Message Date
Etienne b45e44fb85 add checkFileExists method in file storage service (#12229) 2025-05-22 17:18:36 +02:00
Etienne bed46f12df add more logs to cleaning command (#12219) 2025-05-22 15:31:16 +02:00
Thomas Trompette 26709bcc43 Improve logs in workflow trigger (#12215)
- distinguish logs coming from webhook and job triggers
- add workspace and workflow ids to help debugging

Hard to debug sentry issue:

https://twenty-v7.sentry.io/issues/6605607134/?project=4507072499810304&query=&referrer=issue-stream&stream_index=7
2025-05-22 15:30:39 +02:00
Charles Bochet 35817d5fae Fix upgrade command (#12210) 2025-05-22 12:23:36 +02:00
Etienne 0a75cbd06c clean not found file - add logs (#12198) 2025-05-22 11:40:20 +02:00
Charles Bochet 3e731da69c Fix docker setup (#12209)
For fresh install, we need the migrations to happen before the upgrade
command is triggered as the upgrade command is a NestJS command and the
app will try to load env variables from db
2025-05-22 11:32:26 +02:00
10 changed files with 114 additions and 73 deletions
@@ -22,6 +22,7 @@ setup_and_migrate_db() {
# Run setup and migration scripts
NODE_OPTIONS="--max-old-space-size=1500" tsx ./scripts/setup-db.ts
yarn database:migrate:prod
fi
yarn command:prod upgrade
@@ -1,21 +1,14 @@
import { InjectRepository } from '@nestjs/typeorm';
import { basename, dirname } from 'path';
import { isNonEmptyString } from '@sniptt/guards';
import { Command } from 'nest-commander';
import { Equal, Not, Repository } from 'typeorm';
import {
FileStorageException,
FileStorageExceptionCode,
} from 'src/engine/core-modules/file-storage/interfaces/file-storage-exception';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
RunOnWorkspaceArgs,
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
import { FileService } from 'src/engine/core-modules/file/services/file.service';
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { AttachmentWorkspaceEntity } from 'src/modules/attachment/standard-objects/attachment.workspace-entity';
@@ -31,7 +24,7 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
@InjectRepository(Workspace, 'core')
protected readonly workspaceRepository: Repository<Workspace>,
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly fileService: FileService,
private readonly fileStorageService: FileStorageService,
) {
super(workspaceRepository, twentyORMGlobalManager);
}
@@ -57,27 +50,21 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
}
private async checkIfFileIsFound(path: string, workspaceId: string) {
this.logger.log(`Checking if file is found ${path}`);
if (path.startsWith('https://')) return true; // seed data
try {
await this.fileService.getFileStream(
dirname(path),
basename(path),
workspaceId,
);
} catch (error) {
if (
error instanceof FileStorageException &&
error.code === FileStorageExceptionCode.FILE_NOT_FOUND
) {
return false;
}
}
const isFileFound = await this.fileStorageService.checkFileExists({
folderPath: `workspace-${workspaceId}`,
filename: path,
});
return true;
this.logger.log(`File found: ${isFileFound}`);
return isFileFound;
}
private async cleanWorkspaceLogo(workspaceId: string, dryRun: boolean) {
this.logger.log(`Cleaning workspace logo for workspace ${workspaceId}`);
const workspace = await this.workspaceRepository.findOneOrFail({
where: {
id: workspaceId,
@@ -86,6 +73,8 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
if (!isNonEmptyString(workspace.logo)) return;
this.logger.log(`Processing workspace logo for workspace ${workspace.id}`);
const isFileFound = await this.checkIfFileIsFound(
workspace.logo,
workspace.id,
@@ -104,6 +93,7 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
}
private async softDeleteAttachments(workspaceId: string, dryRun: boolean) {
this.logger.log(`Cleaning attachments for workspace ${workspaceId}`);
const attachmentRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<AttachmentWorkspaceEntity>(
workspaceId,
@@ -122,6 +112,7 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
const attachmentIdsToSoftDeleteChunk = await Promise.all(
attachmentsChunk.map(async (attachment) => {
this.logger.log(`Processing attachment ${attachment.id}`);
const isFileFound = await this.checkIfFileIsFound(
attachment.fullPath,
workspaceId,
@@ -150,6 +141,9 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
workspaceId: string,
dryRun: boolean,
) {
this.logger.log(
`Cleaning workspace members avatarUrl for workspace ${workspaceId}`,
);
const workspaceMemberRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkspaceMemberWorkspaceEntity>(
workspaceId,
@@ -164,6 +158,8 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
const workspaceMemberIdsToUpdate: string[] = [];
for (const workspaceMember of workspaceMembers) {
this.logger.log(`Processing workspaceMember ${workspaceMember.id}`);
const isFileFound = await this.checkIfFileIsFound(
workspaceMember.avatarUrl,
workspaceId,
@@ -187,6 +183,7 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
}
private async cleanPeopleAvatarUrl(workspaceId: string, dryRun: boolean) {
this.logger.log(`Cleaning people avatarUrl for workspace ${workspaceId}`);
const personRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<PersonWorkspaceEntity>(
workspaceId,
@@ -201,6 +198,8 @@ export class CleanNotFoundFilesCommand extends ActiveOrSuspendedWorkspacesMigrat
const personIdsToUpdate: string[] = [];
for (const person of people) {
this.logger.log('Processing person', person.id);
const isFileFound = await this.checkIfFileIsFound(
person.avatarUrl,
workspaceId,
@@ -7,6 +7,7 @@ import { FixStandardSelectFieldsPositionCommand } from 'src/database/commands/up
import { LowercaseUserAndInvitationEmailsCommand } from 'src/database/commands/upgrade-version-command/0-54/0-54-lowercase-user-and-invitation-emails.command';
import { MigrateDefaultAvatarUrlToUserWorkspaceCommand } from 'src/database/commands/upgrade-version-command/0-54/0-54-migrate-default-avatar-url-to-user-workspace.command';
import { AppToken } from 'src/engine/core-modules/app-token/app-token.entity';
import { FileStorageModule } from 'src/engine/core-modules/file-storage/file-storage.module';
import { FileModule } from 'src/engine/core-modules/file/file.module';
import { UserWorkspace } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
import { User } from 'src/engine/core-modules/user/user.entity';
@@ -31,6 +32,7 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor
WorkspaceMigrationRunnerModule,
WorkspaceMetadataVersionModule,
FileModule,
FileStorageModule,
],
providers: [
FixStandardSelectFieldsPositionCommand,
@@ -69,20 +69,11 @@ export class DatabaseMigrationService {
});
}
async shouldRunMigrationsIfAllWorkspaceAreAboveVersion0_53(): Promise<boolean> {
const coreWorkspaceSchemaExists = await this.checkCoreWorkspaceExists();
async shouldSkipUpgradeIfFreshInstallation(): Promise<boolean> {
const activeWorkspaceOrSuspendedWorkspaceCount =
await this.loadActiveOrSuspendedWorkspace();
if (!coreWorkspaceSchemaExists) {
this.logger.log(
'core.workspace does not exist. Running migrations for fresh installation.',
);
return true;
}
this.logger.log('Not a first installation, checking workspace versions...');
return await this.areAllWorkspacesAboveVersion0_53();
return activeWorkspaceOrSuspendedWorkspaceCount.length === 0;
}
async runMigrations(): Promise<void> {
@@ -114,26 +105,7 @@ export class DatabaseMigrationService {
}
}
private async checkCoreWorkspaceExists(): Promise<boolean> {
try {
const result = await this.workspaceRepository.query(`
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'core'
AND table_name = 'workspace'
);
`);
return result[0].exists;
} catch (error) {
this.logger.error('Error checking core.workspace existence:', error);
return false;
}
}
private async areAllWorkspacesAboveVersion0_53(): Promise<boolean> {
public async areAllWorkspacesAboveVersion0_53(): Promise<boolean> {
try {
const allActiveOrSuspendedWorkspaces =
await this.loadActiveOrSuspendedWorkspace();
@@ -293,10 +265,21 @@ export class UpgradeCommand extends UpgradeCommandRunner {
passedParams: string[],
options: ActiveOrSuspendedWorkspacesMigrationCommandOptions,
): Promise<void> {
const shouldRunMigrateAsPartOfUpgrade =
await this.databaseMigrationService.shouldRunMigrationsIfAllWorkspaceAreAboveVersion0_53();
const shouldSkipUpgradeIfFreshInstallation =
await this.databaseMigrationService.shouldSkipUpgradeIfFreshInstallation();
if (!shouldRunMigrateAsPartOfUpgrade) {
if (shouldSkipUpgradeIfFreshInstallation) {
this.logger.log(
chalk.blue('Fresh installation detected, skipping migration'),
);
return;
}
const shouldPreventFromUpgradingIfWorkspaceIsBelowVersion0_53 =
!(await this.databaseMigrationService.areAllWorkspacesAboveVersion0_53());
if (shouldPreventFromUpgradingIfWorkspaceIsBelowVersion0_53) {
this.logger.log(
chalk.red(
'Not able to run migrate command, aborting the whole migrate-upgrade operation',
@@ -21,4 +21,8 @@ export interface StorageDriver {
from: { folderPath: string; filename?: string };
to: { folderPath: string; filename?: string };
}): Promise<void>;
checkFileExists(params: {
folderPath: string;
filename: string;
}): Promise<boolean>;
}
@@ -3,11 +3,11 @@ import * as fs from 'fs/promises';
import { dirname, join } from 'path';
import { Readable } from 'stream';
import { StorageDriver } from 'src/engine/core-modules/file-storage/drivers/interfaces/storage-driver.interface';
import {
FileStorageException,
FileStorageExceptionCode,
} from 'src/engine/core-modules/file-storage/interfaces/file-storage-exception';
import { StorageDriver } from 'src/engine/core-modules/file-storage/drivers/interfaces/storage-driver.interface';
export interface LocalDriverOptions {
storagePath: string;
@@ -162,4 +162,17 @@ export class LocalDriver implements StorageDriver {
}): Promise<void> {
await this.copy(params, true);
}
async checkFileExists(params: {
folderPath: string;
filename: string;
}): Promise<boolean> {
const filePath = join(
this.options.storagePath,
params.folderPath,
params.filename,
);
return existsSync(filePath);
}
}
@@ -395,4 +395,26 @@ export class S3Driver implements StorageDriver {
return this.s3Client.createBucket(args);
}
async checkFileExists(params: {
folderPath: string;
filename: string;
}): Promise<boolean> {
try {
await this.s3Client.send(
new HeadObjectCommand({
Bucket: this.bucketName,
Key: `${params.folderPath}/${params.filename}`,
}),
);
} catch (error) {
if (error instanceof NotFound) {
return false;
}
throw error;
}
return true;
}
}
@@ -47,4 +47,11 @@ export class FileStorageService implements StorageDriver {
}): Promise<void> {
return this.driver.download(params);
}
checkFileExists(params: {
folderPath: string;
filename: string;
}): Promise<boolean> {
return this.driver.checkFileExists(params);
}
}
@@ -28,23 +28,33 @@ export class WorkflowTriggerController {
@Post('workflows/:workspaceId/:workflowId')
async runWorkflowByPostRequest(
@Param('workspaceId') workspaceId: string,
@Param('workflowId') workflowId: string,
@Req() request: Request,
) {
return await this.runWorkflow({ workflowId, payload: request.body || {} });
return await this.runWorkflow({
workflowId,
payload: request.body || {},
workspaceId,
});
}
@Get('workflows/:workspaceId/:workflowId')
async runWorkflowByGetRequest(@Param('workflowId') workflowId: string) {
return await this.runWorkflow({ workflowId });
async runWorkflowByGetRequest(
@Param('workspaceId') workspaceId: string,
@Param('workflowId') workflowId: string,
) {
return await this.runWorkflow({ workflowId, workspaceId });
}
private async runWorkflow({
workflowId,
payload,
workspaceId,
}: {
workflowId: string;
payload?: object;
workspaceId: string;
}) {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
@@ -57,7 +67,7 @@ export class WorkflowTriggerController {
if (!isDefined(workflow)) {
throw new WorkflowTriggerException(
'Workflow not found',
`[Webhook trigger] Workflow ${workflowId} not found in workspace ${workspaceId}`,
WorkflowTriggerExceptionCode.NOT_FOUND,
);
}
@@ -67,7 +77,7 @@ export class WorkflowTriggerController {
workflow.lastPublishedVersionId === ''
) {
throw new WorkflowTriggerException(
'Workflow has not been activated',
`[Webhook trigger] Workflow ${workflowId} has not been activated in workspace ${workspaceId}`,
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_STATUS,
);
}
@@ -82,21 +92,21 @@ export class WorkflowTriggerController {
if (!isDefined(workflowVersion)) {
throw new WorkflowTriggerException(
'Workflow version not found',
`[Webhook trigger] No workflow version activated for workflow ${workflowId} in workspace ${workspaceId}`,
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_VERSION,
);
}
if (workflowVersion.trigger?.type !== WorkflowTriggerType.WEBHOOK) {
throw new WorkflowTriggerException(
'Workflow does not have a Webhook trigger',
`[Webhook trigger] Workflow ${workflowId} does not have a Webhook trigger in workspace ${workspaceId}`,
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_TRIGGER,
);
}
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
throw new WorkflowTriggerException(
'Workflow version is not active',
`[Webhook trigger] Workflow version ${workflowVersion.id} is not active in workspace ${workspaceId}`,
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_STATUS,
);
}
@@ -52,14 +52,14 @@ export class WorkflowTriggerJob {
if (!workflow) {
throw new WorkflowTriggerException(
'Workflow not found',
`Workflow ${data.workflowId} not found in workspace ${data.workspaceId}`,
WorkflowTriggerExceptionCode.NOT_FOUND,
);
}
if (!workflow.lastPublishedVersionId) {
throw new WorkflowTriggerException(
'Workflow has no published version',
`Workflow ${data.workflowId} has no published version in workspace ${data.workspaceId}`,
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
}
@@ -75,13 +75,13 @@ export class WorkflowTriggerJob {
if (!workflowVersion) {
throw new WorkflowTriggerException(
'Workflow version not found',
`Workflow version ${workflow.lastPublishedVersionId} not found in workspace ${data.workspaceId}`,
WorkflowTriggerExceptionCode.NOT_FOUND,
);
}
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
throw new WorkflowTriggerException(
'Workflow version is not active',
`Workflow version ${workflowVersion.id} is not active in workspace ${data.workspaceId}`,
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
}