feat(EmailService): add worker concurrency settings and improve email queue prioritization
This commit is contained in:
@@ -163,6 +163,19 @@ SMTP_DOMAIN=smtp.example.com
|
||||
# Default: unset (auto-detect, falls back to 14)
|
||||
# EMAIL_RATE_LIMIT_PER_SECOND=1
|
||||
|
||||
# Number of emails the worker processes in parallel. When unset, concurrency is
|
||||
# derived from the effective rate limit (~ rate * 0.5, min 5, capped by
|
||||
# EMAIL_WORKER_MAX_CONCURRENCY) so a higher SES quota translates into higher
|
||||
# throughput automatically. Pin this only when the Prisma pool or memory is the
|
||||
# binding constraint.
|
||||
# Default: unset (auto-derived)
|
||||
# EMAIL_WORKER_CONCURRENCY=10
|
||||
|
||||
# Upper bound applied to the auto-derived concurrency. Raise this when your SES
|
||||
# quota is high AND the Prisma connection pool has been sized for it.
|
||||
# Default: 50
|
||||
# EMAIL_WORKER_MAX_CONCURRENCY=50
|
||||
|
||||
# ========================================
|
||||
# ADVANCED (rarely needed)
|
||||
# ========================================
|
||||
|
||||
@@ -65,6 +65,17 @@ SES_CONFIGURATION_SET_NO_TRACKING=plunk-configuration-set-no-tracking # Optiona
|
||||
# Default: Fetched from AWS (typically 14 for sandbox, higher for production accounts)
|
||||
# EMAIL_RATE_LIMIT_PER_SECOND=14
|
||||
|
||||
# Email worker concurrency (number of emails processed in parallel)
|
||||
# If not set, derived from the effective rate limit (~ rate * 0.5, min 5, capped
|
||||
# by EMAIL_WORKER_MAX_CONCURRENCY). Set this to pin a fixed value when the
|
||||
# Prisma connection pool or memory is the binding constraint.
|
||||
# EMAIL_WORKER_CONCURRENCY=10
|
||||
|
||||
# Upper bound for auto-derived worker concurrency
|
||||
# Raise this when your SES quota is high AND the Prisma pool has been sized for it.
|
||||
# Default: 50
|
||||
# EMAIL_WORKER_MAX_CONCURRENCY=50
|
||||
|
||||
# ==============================================================================
|
||||
# OAuth (Optional - for social login)
|
||||
# ==============================================================================
|
||||
|
||||
@@ -58,6 +58,20 @@ export const EMAIL_RATE_LIMIT_PER_SECOND = process.env.EMAIL_RATE_LIMIT_PER_SECO
|
||||
? Number(process.env.EMAIL_RATE_LIMIT_PER_SECOND)
|
||||
: undefined;
|
||||
|
||||
// Email Worker Concurrency (optional override)
|
||||
// If not set, concurrency is derived from the effective rate limit so a higher
|
||||
// SES quota actually translates into higher throughput. Set this to pin a fixed
|
||||
// value (useful when Prisma pool size or memory is the binding constraint).
|
||||
export const EMAIL_WORKER_CONCURRENCY = process.env.EMAIL_WORKER_CONCURRENCY
|
||||
? Number(process.env.EMAIL_WORKER_CONCURRENCY)
|
||||
: undefined;
|
||||
|
||||
// Upper bound for auto-derived concurrency. Raise this if you have a large SES
|
||||
// quota AND have sized the Prisma connection pool accordingly.
|
||||
export const EMAIL_WORKER_MAX_CONCURRENCY = process.env.EMAIL_WORKER_MAX_CONCURRENCY
|
||||
? Number(process.env.EMAIL_WORKER_MAX_CONCURRENCY)
|
||||
: 50;
|
||||
|
||||
// Storage
|
||||
export const REDIS_URL = validateEnv('REDIS_URL');
|
||||
export const DATABASE_URL = validateEnv('DATABASE_URL');
|
||||
|
||||
@@ -8,7 +8,12 @@ import type {SendEmailJobData} from '@plunk/types';
|
||||
import {type Job, Worker} from 'bullmq';
|
||||
import signale from 'signale';
|
||||
|
||||
import {DASHBOARD_URI, EMAIL_RATE_LIMIT_PER_SECOND} from '../app/constants.js';
|
||||
import {
|
||||
DASHBOARD_URI,
|
||||
EMAIL_RATE_LIMIT_PER_SECOND,
|
||||
EMAIL_WORKER_CONCURRENCY,
|
||||
EMAIL_WORKER_MAX_CONCURRENCY,
|
||||
} from '../app/constants.js';
|
||||
import {prisma} from '../database/prisma.js';
|
||||
import {CampaignService} from '../services/CampaignService.js';
|
||||
import {EmailService} from '../services/EmailService.js';
|
||||
@@ -47,9 +52,31 @@ async function getEmailRateLimit(): Promise<number> {
|
||||
return DEFAULT_RATE_LIMIT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Derive worker concurrency from the rate limit so a higher SES quota actually
|
||||
* translates into higher throughput. The mean job duration is ~0.5s (Prisma
|
||||
* reads + HTML compile + SES call + writes), so `rate * 0.5` gives ~2× headroom
|
||||
* over the per-second cap. Clamped to keep sandbox accounts useful and to
|
||||
* protect the Prisma pool on very large quotas.
|
||||
*/
|
||||
function deriveWorkerConcurrency(rateLimit: number): number {
|
||||
if (EMAIL_WORKER_CONCURRENCY !== undefined) {
|
||||
return EMAIL_WORKER_CONCURRENCY;
|
||||
}
|
||||
|
||||
const TARGET_JOB_SECONDS = 0.5;
|
||||
const MIN_CONCURRENCY = 5;
|
||||
const derived = Math.ceil(rateLimit * TARGET_JOB_SECONDS);
|
||||
return Math.max(MIN_CONCURRENCY, Math.min(derived, EMAIL_WORKER_MAX_CONCURRENCY));
|
||||
}
|
||||
|
||||
export async function createEmailWorker() {
|
||||
// Fetch the rate limit (from env, AWS, or default)
|
||||
const rateLimit = await getEmailRateLimit();
|
||||
const concurrency = deriveWorkerConcurrency(rateLimit);
|
||||
signale.info(
|
||||
`[EMAIL-PROCESSOR] Worker concurrency: ${concurrency} (rate limit: ${rateLimit}/s)`,
|
||||
);
|
||||
const worker = new Worker<SendEmailJobData>(
|
||||
emailQueue.name,
|
||||
async (job: Job<SendEmailJobData>) => {
|
||||
@@ -253,7 +280,7 @@ export async function createEmailWorker() {
|
||||
},
|
||||
{
|
||||
connection: emailQueue.opts.connection,
|
||||
concurrency: 10, // Process up to 10 emails concurrently
|
||||
concurrency,
|
||||
limiter: {
|
||||
max: rateLimit, // Max emails per second (from env, AWS SES quota, or default)
|
||||
duration: 1000,
|
||||
|
||||
@@ -108,7 +108,7 @@ export class EmailService {
|
||||
await BillingLimitService.incrementUsage(params.projectId, EmailSourceType.TRANSACTIONAL);
|
||||
|
||||
// Queue email for sending
|
||||
await this.queueEmail(email.id);
|
||||
await this.queueEmail(email.id, EmailSourceType.TRANSACTIONAL);
|
||||
|
||||
return email;
|
||||
}
|
||||
@@ -172,7 +172,7 @@ export class EmailService {
|
||||
await BillingLimitService.incrementUsage(params.projectId, sourceType);
|
||||
|
||||
// Queue email for sending
|
||||
await this.queueEmail(email.id);
|
||||
await this.queueEmail(email.id, sourceType);
|
||||
|
||||
return email;
|
||||
}
|
||||
@@ -278,7 +278,7 @@ export class EmailService {
|
||||
await BillingLimitService.incrementUsage(params.projectId, sourceType);
|
||||
|
||||
// Queue email for sending
|
||||
await this.queueEmail(email.id);
|
||||
await this.queueEmail(email.id, sourceType);
|
||||
|
||||
return email;
|
||||
}
|
||||
@@ -1137,7 +1137,7 @@ export class EmailService {
|
||||
* Queue an email for sending
|
||||
* Adds email to the BullMQ queue for processing by workers
|
||||
*/
|
||||
private static async queueEmail(emailId: string, delay?: number): Promise<void> {
|
||||
await QueueService.queueEmail(emailId, delay);
|
||||
private static async queueEmail(emailId: string, sourceType: EmailSourceType, delay?: number): Promise<void> {
|
||||
await QueueService.queueEmail(emailId, sourceType, delay);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import {CampaignStatus, EmailStatus} from '@plunk/db';
|
||||
import {CampaignStatus, EmailSourceType, EmailStatus} from '@plunk/db';
|
||||
import {type Job, Queue} from 'bullmq';
|
||||
import type {RedisOptions} from 'ioredis';
|
||||
import signale from 'signale';
|
||||
@@ -174,20 +174,43 @@ export const meterQueue = new Queue<MeterEventJobData>('meter', {
|
||||
},
|
||||
});
|
||||
|
||||
function emailPriorityFor(sourceType: EmailSourceType): number {
|
||||
switch (sourceType) {
|
||||
case EmailSourceType.TRANSACTIONAL:
|
||||
return 1;
|
||||
case EmailSourceType.WORKFLOW:
|
||||
return 5;
|
||||
case EmailSourceType.CAMPAIGN:
|
||||
return 10;
|
||||
default:
|
||||
return 5;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue Service - Centralized queue management
|
||||
*/
|
||||
export class QueueService {
|
||||
/**
|
||||
* Add email to queue for sending
|
||||
* Add email to queue for sending.
|
||||
*
|
||||
* Transactional emails jump the queue ahead of workflow and campaign sends
|
||||
* via BullMQ's priority (lower number = higher precedence). This prevents
|
||||
* latency-sensitive sends (login codes, password resets) from queuing behind
|
||||
* large campaign bursts on the shared `email` queue.
|
||||
*/
|
||||
public static async queueEmail(emailId: string, delay?: number): Promise<Job<SendEmailJobData>> {
|
||||
public static async queueEmail(
|
||||
emailId: string,
|
||||
sourceType: EmailSourceType,
|
||||
delay?: number,
|
||||
): Promise<Job<SendEmailJobData>> {
|
||||
return emailQueue.add(
|
||||
'send-email',
|
||||
{emailId},
|
||||
{
|
||||
delay, // Optional delay in milliseconds
|
||||
jobId: `email-${emailId}`, // Prevent duplicate jobs
|
||||
delay,
|
||||
jobId: `email-${emailId}`,
|
||||
priority: emailPriorityFor(sourceType),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -130,6 +130,8 @@ Plunk bundles a self-hosted [ntfy](https://ntfy.sh) server for internal system n
|
||||
| ----------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
|
||||
| `AUTO_PROJECT_DISABLE` | No | When `true`, projects are automatically suspended when bounce or complaint rate thresholds are exceeded. Set to `false` to manage project status manually. | `true` |
|
||||
| `EMAIL_RATE_LIMIT_PER_SECOND` | No | Override the email sending rate limit. If not set, Plunk automatically fetches the quota from your AWS SES account. | — |
|
||||
| `EMAIL_WORKER_CONCURRENCY` | No | Number of emails the worker processes in parallel. When unset, derived from the effective rate limit so a higher SES quota scales throughput automatically. | — |
|
||||
| `EMAIL_WORKER_MAX_CONCURRENCY`| No | Upper bound applied to the auto-derived worker concurrency. Raise this only after sizing the Prisma connection pool accordingly. | `50` |
|
||||
|
||||
## Advanced
|
||||
|
||||
|
||||
Reference in New Issue
Block a user