Compare commits

...

2 Commits

Author SHA1 Message Date
Félix Malfait 33675aabb9 fix(messaging): create messageThread.subject field metadata + column in 1-21 backfill (#19394)
- The 1-21 \`upgrade:1-21:backfill-message-thread-subject\` command
assumed the legacy \`sync-metadata\` flow would create the new
\`messageThread.subject\` field metadata and column on existing
workspaces. That sync was removed, so the column was never added and the
backfill silently skipped.
- The command now ensures the field exists by computing the standard
\`messageThread.subject\` flat field from the twenty-standard
application and running it through
\`WorkspaceMigrationValidateBuildAndRunService\` (same pattern used by
the page-layout / command-menu-item backfills). This creates both the
field metadata row and the workspace schema column.
- After ensuring the field, the existing \`UPDATE messageThread SET
subject = ...\` runs as before.

- [ ] On a workspace with no \`subject\` column on \`messageThread\`,
run \`yarn command:prod upgrade:1-21:backfill-message-thread-subject\`
and confirm:
  - the field metadata row is created in \`core.\"fieldMetadata\"\`
- the \`subject\` column is created on \`workspace_<id>.messageThread\`
  - existing message threads are backfilled from the most recent message
- [ ] Re-run on the same workspace and confirm it is a no-op (field
already exists, no rows to update)
- [ ] Run on a workspace that already has the column but \`NULL\`
subjects and confirm only the backfill runs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 14:45:53 +02:00
Félix Malfait 54d50768d8 fix(messaging): split thread create/update statements and stop clobbering accumulator
Fixes two bugs in MessagingMessageService.saveMessagesWithinTransaction.

1. Postgres "ON CONFLICT DO UPDATE command cannot affect row a second
   time" during message import: creates and subject updates were merged
   into a single bulk upsert keyed on id, so when the same thread id
   appeared in both lists Postgres rejected the statement. Issue them as
   two separate statements instead — insert for creates, upsert for
   subject updates — which is also closer to the pre-#19351 behavior.

2. enrichMessageAccumulatorWithExistingMessageChannelMessageAssociations
   replaced the accumulator object wholesale, dropping any
   existingThreadInDB previously set by
   enrichMessageAccumulatorWithExistingMessageThreadIds. Mutate the
   existing accumulator instead.
2026-04-07 13:27:32 +02:00
3 changed files with 132 additions and 32 deletions
@@ -1,18 +1,29 @@
import { Command } from 'nest-commander';
import { STANDARD_OBJECTS } from 'twenty-shared/metadata';
import { FieldMetadataType } from 'twenty-shared/types';
import { ActiveOrSuspendedWorkspaceCommandRunner } from 'src/database/commands/command-runners/active-or-suspended-workspace.command-runner';
import { WorkspaceIteratorService } from 'src/database/commands/command-runners/workspace-iterator.service';
import { type RunOnWorkspaceArgs } from 'src/database/commands/command-runners/workspace.command-runner';
import { ApplicationService } from 'src/engine/core-modules/application/application.service';
import { findFlatEntityByUniversalIdentifier } from 'src/engine/metadata-modules/flat-entity/utils/find-flat-entity-by-universal-identifier.util';
import { getDefaultFlatFieldMetadata } from 'src/engine/metadata-modules/flat-field-metadata/utils/get-default-flat-field-metadata-from-create-field-input.util';
import { type FlatObjectMetadata } from 'src/engine/metadata-modules/flat-object-metadata/types/flat-object-metadata.type';
import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service';
import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util';
import { WorkspaceMigrationValidateBuildAndRunService } from 'src/engine/workspace-manager/workspace-migration/services/workspace-migration-validate-build-and-run-service';
@Command({
name: 'upgrade:1-21:backfill-message-thread-subject',
description:
'Backfill messageThread.subject from the most recently received message in each thread',
'Create the messageThread.subject standard field if missing and backfill it from the most recently received message in each thread',
})
export class BackfillMessageThreadSubjectCommand extends ActiveOrSuspendedWorkspaceCommandRunner {
constructor(
protected readonly workspaceIteratorService: WorkspaceIteratorService,
private readonly applicationService: ApplicationService,
private readonly workspaceCacheService: WorkspaceCacheService,
private readonly workspaceMigrationValidateBuildAndRunService: WorkspaceMigrationValidateBuildAndRunService,
) {
super(workspaceIteratorService);
}
@@ -28,7 +39,10 @@ export class BackfillMessageThreadSubjectCommand extends ActiveOrSuspendedWorksp
return;
}
const schemaName = getWorkspaceSchemaName(workspaceId);
await this.ensureSubjectFieldExists({
workspaceId,
isDryRun: !!options.dryRun,
});
if (options.dryRun) {
this.logger.log(
@@ -38,23 +52,7 @@ export class BackfillMessageThreadSubjectCommand extends ActiveOrSuspendedWorksp
return;
}
const columnExists = await dataSource.query(
`SELECT 1 FROM information_schema.columns
WHERE table_schema = $1
AND table_name = 'messageThread'
AND column_name = 'subject'`,
[schemaName],
undefined,
{ shouldBypassPermissionChecks: true },
);
if (columnExists.length === 0) {
this.logger.log(
`Column "subject" does not exist yet on messageThread for workspace ${workspaceId}, skipping (will be created by sync-metadata)`,
);
return;
}
const schemaName = getWorkspaceSchemaName(workspaceId);
const result = await dataSource.query(
`UPDATE "${schemaName}"."messageThread" mt
@@ -75,4 +73,105 @@ export class BackfillMessageThreadSubjectCommand extends ActiveOrSuspendedWorksp
`Backfilled subject for ${result?.[1] ?? 0} message threads in workspace ${workspaceId}`,
);
}
private async ensureSubjectFieldExists({
workspaceId,
isDryRun,
}: {
workspaceId: string;
isDryRun: boolean;
}): Promise<void> {
const { flatObjectMetadataMaps, flatFieldMetadataMaps } =
await this.workspaceCacheService.getOrRecompute(workspaceId, [
'flatObjectMetadataMaps',
'flatFieldMetadataMaps',
]);
const messageThreadObjectMetadata =
findFlatEntityByUniversalIdentifier<FlatObjectMetadata>({
flatEntityMaps: flatObjectMetadataMaps,
universalIdentifier: STANDARD_OBJECTS.messageThread.universalIdentifier,
});
if (!messageThreadObjectMetadata) {
this.logger.log(
`messageThread object metadata not found for workspace ${workspaceId}, skipping`,
);
return;
}
const existingField = findFlatEntityByUniversalIdentifier({
flatEntityMaps: flatFieldMetadataMaps,
universalIdentifier:
STANDARD_OBJECTS.messageThread.fields.subject.universalIdentifier,
});
if (existingField) {
return;
}
if (isDryRun) {
this.logger.log(
`[DRY RUN] Would create messageThread.subject field for workspace ${workspaceId}`,
);
return;
}
const { twentyStandardFlatApplication } =
await this.applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow(
{ workspaceId },
);
const flatFieldMetadataToCreate = {
...getDefaultFlatFieldMetadata({
createFieldInput: {
name: 'subject',
type: FieldMetadataType.TEXT,
label: 'Subject',
description: 'Subject',
icon: 'IconMessage',
isNullable: true,
isUIReadOnly: true,
universalIdentifier:
STANDARD_OBJECTS.messageThread.fields.subject.universalIdentifier,
},
flatApplication: twentyStandardFlatApplication,
objectMetadataUniversalIdentifier:
messageThreadObjectMetadata.universalIdentifier,
}),
isCustom: false,
};
const validateAndBuildResult =
await this.workspaceMigrationValidateBuildAndRunService.validateBuildAndRunWorkspaceMigration(
{
allFlatEntityOperationByMetadataName: {
fieldMetadata: {
flatEntityToCreate: [flatFieldMetadataToCreate],
flatEntityToDelete: [],
flatEntityToUpdate: [],
},
},
workspaceId,
applicationUniversalIdentifier:
twentyStandardFlatApplication.universalIdentifier,
},
);
if (validateAndBuildResult.status === 'fail') {
this.logger.error(
`Failed to create messageThread.subject field for workspace ${workspaceId}:\n${JSON.stringify(validateAndBuildResult, null, 2)}`,
);
throw new Error(
`Failed to create messageThread.subject field for workspace ${workspaceId}`,
);
}
this.logger.log(
`Created messageThread.subject field for workspace ${workspaceId}`,
);
}
}
@@ -18,6 +18,7 @@ import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { FieldMetadataModule } from 'src/engine/metadata-modules/field-metadata/field-metadata.module';
import { MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity';
import { WorkspaceSchemaManagerModule } from 'src/engine/twenty-orm/workspace-schema-manager/workspace-schema-manager.module';
import { WorkspaceCacheModule } from 'src/engine/workspace-cache/workspace-cache.module';
@@ -31,6 +32,7 @@ import { WorkspaceMigrationModule } from 'src/engine/workspace-manager/workspace
MessageFolderEntity,
]),
DataSourceModule,
FieldMetadataModule,
WorkspaceCacheModule,
ApplicationModule,
WorkspaceMigrationModule,
@@ -231,16 +231,18 @@ export class MessagingMessageService {
}
}
const threadsToUpsert = [
...messageThreadsToCreate,
...Array.from(threadSubjectUpdates.entries()).map(
([id, { subject }]) => ({ id, subject }),
),
];
if (messageThreadsToCreate.length > 0) {
await messageThreadRepository.insert(
messageThreadsToCreate,
transactionManager,
);
}
if (threadsToUpsert.length > 0) {
if (threadSubjectUpdates.size > 0) {
await messageThreadRepository.upsert(
threadsToUpsert,
Array.from(threadSubjectUpdates.entries()).map(
([id, { subject }]) => ({ id, subject }),
),
['id'],
transactionManager,
);
@@ -433,11 +435,8 @@ export class MessagingMessageService {
);
if (existingMessageChannelMessageAssociation) {
messageAccumulatorMap.set(message.externalId, {
existingMessageInDB: existingMessage,
existingMessageChannelMessageAssociationInDB:
existingMessageChannelMessageAssociation,
});
messageAccumulator.existingMessageChannelMessageAssociationInDB =
existingMessageChannelMessageAssociation;
}
}
}