From 80beb2bb9937101d1723f2d54648060fdcbe6cef Mon Sep 17 00:00:00 2001 From: Dries Augustyns Date: Wed, 27 May 2026 17:53:30 +0200 Subject: [PATCH] feat(EmailService): add worker concurrency settings and improve email queue prioritization --- .env.self-host.example | 13 ++++++++ apps/api/.env.example | 11 +++++++ apps/api/src/app/constants.ts | 14 ++++++++ apps/api/src/jobs/email-processor.ts | 31 +++++++++++++++-- apps/api/src/services/EmailService.ts | 10 +++--- apps/api/src/services/QueueService.ts | 33 ++++++++++++++++--- .../self-hosting/environment-variables.mdx | 2 ++ 7 files changed, 102 insertions(+), 12 deletions(-) diff --git a/.env.self-host.example b/.env.self-host.example index 3da8174..7b49b79 100644 --- a/.env.self-host.example +++ b/.env.self-host.example @@ -163,6 +163,19 @@ SMTP_DOMAIN=smtp.example.com # Default: unset (auto-detect, falls back to 14) # EMAIL_RATE_LIMIT_PER_SECOND=1 +# Number of emails the worker processes in parallel. When unset, concurrency is +# derived from the effective rate limit (~ rate * 0.5, min 5, capped by +# EMAIL_WORKER_MAX_CONCURRENCY) so a higher SES quota translates into higher +# throughput automatically. Pin this only when the Prisma pool or memory is the +# binding constraint. +# Default: unset (auto-derived) +# EMAIL_WORKER_CONCURRENCY=10 + +# Upper bound applied to the auto-derived concurrency. Raise this when your SES +# quota is high AND the Prisma connection pool has been sized for it. +# Default: 50 +# EMAIL_WORKER_MAX_CONCURRENCY=50 + # ======================================== # ADVANCED (rarely needed) # ======================================== diff --git a/apps/api/.env.example b/apps/api/.env.example index eecaf18..e7c7e3a 100644 --- a/apps/api/.env.example +++ b/apps/api/.env.example @@ -65,6 +65,17 @@ SES_CONFIGURATION_SET_NO_TRACKING=plunk-configuration-set-no-tracking # Optiona # Default: Fetched from AWS (typically 14 for sandbox, higher for production accounts) # EMAIL_RATE_LIMIT_PER_SECOND=14 +# Email worker concurrency (number of emails processed in parallel) +# If not set, derived from the effective rate limit (~ rate * 0.5, min 5, capped +# by EMAIL_WORKER_MAX_CONCURRENCY). Set this to pin a fixed value when the +# Prisma connection pool or memory is the binding constraint. +# EMAIL_WORKER_CONCURRENCY=10 + +# Upper bound for auto-derived worker concurrency +# Raise this when your SES quota is high AND the Prisma pool has been sized for it. +# Default: 50 +# EMAIL_WORKER_MAX_CONCURRENCY=50 + # ============================================================================== # OAuth (Optional - for social login) # ============================================================================== diff --git a/apps/api/src/app/constants.ts b/apps/api/src/app/constants.ts index face47b..64e1026 100644 --- a/apps/api/src/app/constants.ts +++ b/apps/api/src/app/constants.ts @@ -58,6 +58,20 @@ export const EMAIL_RATE_LIMIT_PER_SECOND = process.env.EMAIL_RATE_LIMIT_PER_SECO ? Number(process.env.EMAIL_RATE_LIMIT_PER_SECOND) : undefined; +// Email Worker Concurrency (optional override) +// If not set, concurrency is derived from the effective rate limit so a higher +// SES quota actually translates into higher throughput. Set this to pin a fixed +// value (useful when Prisma pool size or memory is the binding constraint). +export const EMAIL_WORKER_CONCURRENCY = process.env.EMAIL_WORKER_CONCURRENCY + ? Number(process.env.EMAIL_WORKER_CONCURRENCY) + : undefined; + +// Upper bound for auto-derived concurrency. Raise this if you have a large SES +// quota AND have sized the Prisma connection pool accordingly. +export const EMAIL_WORKER_MAX_CONCURRENCY = process.env.EMAIL_WORKER_MAX_CONCURRENCY + ? Number(process.env.EMAIL_WORKER_MAX_CONCURRENCY) + : 50; + // Storage export const REDIS_URL = validateEnv('REDIS_URL'); export const DATABASE_URL = validateEnv('DATABASE_URL'); diff --git a/apps/api/src/jobs/email-processor.ts b/apps/api/src/jobs/email-processor.ts index 97c6128..08e4c74 100644 --- a/apps/api/src/jobs/email-processor.ts +++ b/apps/api/src/jobs/email-processor.ts @@ -8,7 +8,12 @@ import type {SendEmailJobData} from '@plunk/types'; import {type Job, Worker} from 'bullmq'; import signale from 'signale'; -import {DASHBOARD_URI, EMAIL_RATE_LIMIT_PER_SECOND} from '../app/constants.js'; +import { + DASHBOARD_URI, + EMAIL_RATE_LIMIT_PER_SECOND, + EMAIL_WORKER_CONCURRENCY, + EMAIL_WORKER_MAX_CONCURRENCY, +} from '../app/constants.js'; import {prisma} from '../database/prisma.js'; import {CampaignService} from '../services/CampaignService.js'; import {EmailService} from '../services/EmailService.js'; @@ -47,9 +52,31 @@ async function getEmailRateLimit(): Promise { return DEFAULT_RATE_LIMIT; } +/** + * Derive worker concurrency from the rate limit so a higher SES quota actually + * translates into higher throughput. The mean job duration is ~0.5s (Prisma + * reads + HTML compile + SES call + writes), so `rate * 0.5` gives ~2× headroom + * over the per-second cap. Clamped to keep sandbox accounts useful and to + * protect the Prisma pool on very large quotas. + */ +function deriveWorkerConcurrency(rateLimit: number): number { + if (EMAIL_WORKER_CONCURRENCY !== undefined) { + return EMAIL_WORKER_CONCURRENCY; + } + + const TARGET_JOB_SECONDS = 0.5; + const MIN_CONCURRENCY = 5; + const derived = Math.ceil(rateLimit * TARGET_JOB_SECONDS); + return Math.max(MIN_CONCURRENCY, Math.min(derived, EMAIL_WORKER_MAX_CONCURRENCY)); +} + export async function createEmailWorker() { // Fetch the rate limit (from env, AWS, or default) const rateLimit = await getEmailRateLimit(); + const concurrency = deriveWorkerConcurrency(rateLimit); + signale.info( + `[EMAIL-PROCESSOR] Worker concurrency: ${concurrency} (rate limit: ${rateLimit}/s)`, + ); const worker = new Worker( emailQueue.name, async (job: Job) => { @@ -253,7 +280,7 @@ export async function createEmailWorker() { }, { connection: emailQueue.opts.connection, - concurrency: 10, // Process up to 10 emails concurrently + concurrency, limiter: { max: rateLimit, // Max emails per second (from env, AWS SES quota, or default) duration: 1000, diff --git a/apps/api/src/services/EmailService.ts b/apps/api/src/services/EmailService.ts index 890701d..0f509a6 100644 --- a/apps/api/src/services/EmailService.ts +++ b/apps/api/src/services/EmailService.ts @@ -108,7 +108,7 @@ export class EmailService { await BillingLimitService.incrementUsage(params.projectId, EmailSourceType.TRANSACTIONAL); // Queue email for sending - await this.queueEmail(email.id); + await this.queueEmail(email.id, EmailSourceType.TRANSACTIONAL); return email; } @@ -172,7 +172,7 @@ export class EmailService { await BillingLimitService.incrementUsage(params.projectId, sourceType); // Queue email for sending - await this.queueEmail(email.id); + await this.queueEmail(email.id, sourceType); return email; } @@ -278,7 +278,7 @@ export class EmailService { await BillingLimitService.incrementUsage(params.projectId, sourceType); // Queue email for sending - await this.queueEmail(email.id); + await this.queueEmail(email.id, sourceType); return email; } @@ -1137,7 +1137,7 @@ export class EmailService { * Queue an email for sending * Adds email to the BullMQ queue for processing by workers */ - private static async queueEmail(emailId: string, delay?: number): Promise { - await QueueService.queueEmail(emailId, delay); + private static async queueEmail(emailId: string, sourceType: EmailSourceType, delay?: number): Promise { + await QueueService.queueEmail(emailId, sourceType, delay); } } diff --git a/apps/api/src/services/QueueService.ts b/apps/api/src/services/QueueService.ts index af17fa7..5dbab28 100644 --- a/apps/api/src/services/QueueService.ts +++ b/apps/api/src/services/QueueService.ts @@ -1,4 +1,4 @@ -import {CampaignStatus, EmailStatus} from '@plunk/db'; +import {CampaignStatus, EmailSourceType, EmailStatus} from '@plunk/db'; import {type Job, Queue} from 'bullmq'; import type {RedisOptions} from 'ioredis'; import signale from 'signale'; @@ -174,20 +174,43 @@ export const meterQueue = new Queue('meter', { }, }); +function emailPriorityFor(sourceType: EmailSourceType): number { + switch (sourceType) { + case EmailSourceType.TRANSACTIONAL: + return 1; + case EmailSourceType.WORKFLOW: + return 5; + case EmailSourceType.CAMPAIGN: + return 10; + default: + return 5; + } +} + /** * Queue Service - Centralized queue management */ export class QueueService { /** - * Add email to queue for sending + * Add email to queue for sending. + * + * Transactional emails jump the queue ahead of workflow and campaign sends + * via BullMQ's priority (lower number = higher precedence). This prevents + * latency-sensitive sends (login codes, password resets) from queuing behind + * large campaign bursts on the shared `email` queue. */ - public static async queueEmail(emailId: string, delay?: number): Promise> { + public static async queueEmail( + emailId: string, + sourceType: EmailSourceType, + delay?: number, + ): Promise> { return emailQueue.add( 'send-email', {emailId}, { - delay, // Optional delay in milliseconds - jobId: `email-${emailId}`, // Prevent duplicate jobs + delay, + jobId: `email-${emailId}`, + priority: emailPriorityFor(sourceType), }, ); } diff --git a/apps/wiki/content/docs/self-hosting/environment-variables.mdx b/apps/wiki/content/docs/self-hosting/environment-variables.mdx index eeccc95..3760dc4 100644 --- a/apps/wiki/content/docs/self-hosting/environment-variables.mdx +++ b/apps/wiki/content/docs/self-hosting/environment-variables.mdx @@ -130,6 +130,8 @@ Plunk bundles a self-hosted [ntfy](https://ntfy.sh) server for internal system n | ----------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `AUTO_PROJECT_DISABLE` | No | When `true`, projects are automatically suspended when bounce or complaint rate thresholds are exceeded. Set to `false` to manage project status manually. | `true` | | `EMAIL_RATE_LIMIT_PER_SECOND` | No | Override the email sending rate limit. If not set, Plunk automatically fetches the quota from your AWS SES account. | — | +| `EMAIL_WORKER_CONCURRENCY` | No | Number of emails the worker processes in parallel. When unset, derived from the effective rate limit so a higher SES quota scales throughput automatically. | — | +| `EMAIL_WORKER_MAX_CONCURRENCY`| No | Upper bound applied to the auto-derived worker concurrency. Raise this only after sizing the Prisma connection pool accordingly. | `50` | ## Advanced