feat(mana-notify): add central notification service

NestJS notification microservice for email, push, Matrix, and webhook
notifications across all ManaCore apps.

Features:
- Multi-channel delivery (email, push, Matrix, webhook)
- Handlebars template engine with defaults
- User notification preferences
- BullMQ async job processing
- Delivery tracking and logging
- Prometheus metrics

Includes @manacore/notify-client package for NestJS integration.
This commit is contained in:
Till-JS 2026-01-29 22:07:38 +01:00
parent 1495dbe476
commit b5fa0f42b6
66 changed files with 4824 additions and 0 deletions

View file

@ -0,0 +1,125 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger, Inject } from '@nestjs/common';
import { Job } from 'bullmq';
import { eq } from 'drizzle-orm';
import { EMAIL_QUEUE } from '../queue.module';
import { EmailService } from '../../channels/email/email.service';
import { MetricsService } from '../../metrics/metrics.service';
import { DATABASE_CONNECTION } from '../../db/database.module';
import { notifications, deliveryLogs, type NewDeliveryLog } from '../../db/schema';
export interface EmailJob {
notificationId: string;
to: string;
subject: string;
html: string;
text?: string;
from?: string;
template?: string;
appId: string;
}
@Processor(EMAIL_QUEUE, {
concurrency: 5,
})
export class EmailProcessor extends WorkerHost {
private readonly logger = new Logger(EmailProcessor.name);
constructor(
private readonly emailService: EmailService,
private readonly metricsService: MetricsService,
@Inject(DATABASE_CONNECTION) private readonly db: any
) {
super();
}
async process(job: Job<EmailJob>): Promise<void> {
const { notificationId, to, subject, html, text, from, template, appId } = job.data;
const startTime = Date.now();
this.logger.debug(`Processing email job ${job.id} to ${to}`);
// Update notification status to processing
await this.updateNotificationStatus(notificationId, 'processing');
const result = await this.emailService.sendEmail({
to,
subject,
html,
text,
from,
});
const durationMs = Date.now() - startTime;
// Log the delivery attempt
await this.logDelivery({
notificationId,
attemptNumber: job.attemptsMade + 1,
channel: 'email',
success: result.success,
errorMessage: result.error,
providerId: result.messageId,
durationMs,
});
// Record metrics
this.metricsService.recordEmailSent(template || 'custom', result.success);
this.metricsService.recordEmailLatency(durationMs / 1000);
if (result.success) {
this.metricsService.recordNotificationSent('email', appId);
await this.updateNotificationStatus(notificationId, 'delivered', result.messageId);
this.logger.log(`Email sent successfully to ${to} in ${durationMs}ms`);
} else {
this.metricsService.recordNotificationFailed('email', appId, 'send_error');
// Only mark as failed if no more retries
if (job.attemptsMade >= (job.opts.attempts || 3) - 1) {
await this.updateNotificationStatus(notificationId, 'failed', undefined, result.error);
}
throw new Error(result.error || 'Failed to send email');
}
}
@OnWorkerEvent('failed')
onFailed(job: Job<EmailJob>, error: Error) {
this.logger.error(`Email job ${job.id} failed: ${error.message}`);
}
private async updateNotificationStatus(
notificationId: string,
status: string,
providerId?: string,
errorMessage?: string
): Promise<void> {
try {
const updateData: Record<string, unknown> = {
status,
updatedAt: new Date(),
};
if (status === 'delivered') {
updateData.deliveredAt = new Date();
}
if (errorMessage) {
updateData.errorMessage = errorMessage;
}
await this.db
.update(notifications)
.set(updateData)
.where(eq(notifications.id, notificationId));
} catch (error) {
this.logger.error(`Failed to update notification status: ${error}`);
}
}
private async logDelivery(log: Omit<NewDeliveryLog, 'id' | 'createdAt'>): Promise<void> {
try {
await this.db.insert(deliveryLogs).values(log);
} catch (error) {
this.logger.error(`Failed to log delivery: ${error}`);
}
}
}

View file

@ -0,0 +1,121 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger, Inject } from '@nestjs/common';
import { Job } from 'bullmq';
import { eq } from 'drizzle-orm';
import { MATRIX_QUEUE } from '../queue.module';
import { MatrixService } from '../../channels/matrix/matrix.service';
import { MetricsService } from '../../metrics/metrics.service';
import { DATABASE_CONNECTION } from '../../db/database.module';
import { notifications, deliveryLogs, type NewDeliveryLog } from '../../db/schema';
export interface MatrixJob {
notificationId: string;
roomId: string;
body: string;
formattedBody?: string;
msgtype?: 'text' | 'notice';
appId: string;
}
@Processor(MATRIX_QUEUE, {
concurrency: 5,
})
export class MatrixProcessor extends WorkerHost {
private readonly logger = new Logger(MatrixProcessor.name);
constructor(
private readonly matrixService: MatrixService,
private readonly metricsService: MetricsService,
@Inject(DATABASE_CONNECTION) private readonly db: any
) {
super();
}
async process(job: Job<MatrixJob>): Promise<void> {
const { notificationId, roomId, body, formattedBody, msgtype, appId } = job.data;
const startTime = Date.now();
this.logger.debug(`Processing Matrix job ${job.id} to room ${roomId}`);
// Update notification status to processing
await this.updateNotificationStatus(notificationId, 'processing');
const result = await this.matrixService.sendMessage({
roomId,
body,
formattedBody,
msgtype,
});
const durationMs = Date.now() - startTime;
// Log the delivery attempt
await this.logDelivery({
notificationId,
attemptNumber: job.attemptsMade + 1,
channel: 'matrix',
success: result.success,
errorMessage: result.error,
providerId: result.eventId,
durationMs,
});
this.metricsService.recordMatrixSent(result.success);
this.metricsService.recordNotificationLatency('matrix', durationMs / 1000);
if (result.success) {
this.metricsService.recordNotificationSent('matrix', appId);
await this.updateNotificationStatus(notificationId, 'delivered', result.eventId);
this.logger.log(`Matrix message sent to ${roomId} in ${durationMs}ms`);
} else {
this.metricsService.recordNotificationFailed('matrix', appId, 'send_error');
// Only mark as failed if no more retries
if (job.attemptsMade >= (job.opts.attempts || 3) - 1) {
await this.updateNotificationStatus(notificationId, 'failed', undefined, result.error);
}
throw new Error(result.error || 'Failed to send Matrix message');
}
}
@OnWorkerEvent('failed')
onFailed(job: Job<MatrixJob>, error: Error) {
this.logger.error(`Matrix job ${job.id} failed: ${error.message}`);
}
private async updateNotificationStatus(
notificationId: string,
status: string,
providerId?: string,
errorMessage?: string
): Promise<void> {
try {
const updateData: Record<string, unknown> = {
status,
updatedAt: new Date(),
};
if (status === 'delivered') {
updateData.deliveredAt = new Date();
}
if (errorMessage) {
updateData.errorMessage = errorMessage;
}
await this.db
.update(notifications)
.set(updateData)
.where(eq(notifications.id, notificationId));
} catch (error) {
this.logger.error(`Failed to update notification status: ${error}`);
}
}
private async logDelivery(log: Omit<NewDeliveryLog, 'id' | 'createdAt'>): Promise<void> {
try {
await this.db.insert(deliveryLogs).values(log);
} catch (error) {
this.logger.error(`Failed to log delivery: ${error}`);
}
}
}

View file

@ -0,0 +1,154 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger, Inject } from '@nestjs/common';
import { Job } from 'bullmq';
import { eq } from 'drizzle-orm';
import { PUSH_QUEUE } from '../queue.module';
import { PushService } from '../../channels/push/push.service';
import { MetricsService } from '../../metrics/metrics.service';
import { DATABASE_CONNECTION } from '../../db/database.module';
import { notifications, deliveryLogs, type NewDeliveryLog } from '../../db/schema';
export interface PushJob {
notificationId: string;
tokens: string[];
title: string;
body: string;
data?: Record<string, unknown>;
sound?: 'default' | null;
badge?: number;
platform: string;
appId: string;
}
@Processor(PUSH_QUEUE, {
concurrency: 10,
})
export class PushProcessor extends WorkerHost {
private readonly logger = new Logger(PushProcessor.name);
constructor(
private readonly pushService: PushService,
private readonly metricsService: MetricsService,
@Inject(DATABASE_CONNECTION) private readonly db: any
) {
super();
}
async process(job: Job<PushJob>): Promise<void> {
const { notificationId, tokens, title, body, data, sound, badge, platform, appId } = job.data;
const startTime = Date.now();
this.logger.debug(`Processing push job ${job.id} to ${tokens.length} tokens`);
// Update notification status to processing
await this.updateNotificationStatus(notificationId, 'processing');
const results = await this.pushService.sendToTokens(tokens, {
title,
body,
data,
sound,
badge,
});
const durationMs = Date.now() - startTime;
// Count successes and failures
let successCount = 0;
let failCount = 0;
const ticketIds: string[] = [];
for (const [token, result] of results) {
if (result.success) {
successCount++;
if (result.ticketId) {
ticketIds.push(result.ticketId);
}
} else {
failCount++;
}
// Record per-token metrics
this.metricsService.recordPushSent(platform, result.success);
}
// Log the delivery attempt
await this.logDelivery({
notificationId,
attemptNumber: job.attemptsMade + 1,
channel: 'push',
success: successCount > 0,
errorMessage: failCount > 0 ? `${failCount}/${tokens.length} tokens failed` : undefined,
providerId: ticketIds.join(','),
durationMs,
});
this.metricsService.recordPushLatency(durationMs / 1000);
if (successCount > 0) {
this.metricsService.recordNotificationSent('push', appId);
await this.updateNotificationStatus(
notificationId,
failCount === 0 ? 'delivered' : 'delivered', // Partial success still counts as delivered
ticketIds.join(',')
);
this.logger.log(
`Push notification sent: ${successCount}/${tokens.length} successful in ${durationMs}ms`
);
} else {
this.metricsService.recordNotificationFailed('push', appId, 'send_error');
// Only mark as failed if no more retries
if (job.attemptsMade >= (job.opts.attempts || 3) - 1) {
await this.updateNotificationStatus(
notificationId,
'failed',
undefined,
'All tokens failed'
);
}
throw new Error('All push tokens failed');
}
}
@OnWorkerEvent('failed')
onFailed(job: Job<PushJob>, error: Error) {
this.logger.error(`Push job ${job.id} failed: ${error.message}`);
}
private async updateNotificationStatus(
notificationId: string,
status: string,
providerId?: string,
errorMessage?: string
): Promise<void> {
try {
const updateData: Record<string, unknown> = {
status,
updatedAt: new Date(),
};
if (status === 'delivered') {
updateData.deliveredAt = new Date();
}
if (errorMessage) {
updateData.errorMessage = errorMessage;
}
await this.db
.update(notifications)
.set(updateData)
.where(eq(notifications.id, notificationId));
} catch (error) {
this.logger.error(`Failed to update notification status: ${error}`);
}
}
private async logDelivery(log: Omit<NewDeliveryLog, 'id' | 'createdAt'>): Promise<void> {
try {
await this.db.insert(deliveryLogs).values(log);
} catch (error) {
this.logger.error(`Failed to log delivery: ${error}`);
}
}
}

View file

@ -0,0 +1,123 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger, Inject } from '@nestjs/common';
import { Job } from 'bullmq';
import { eq } from 'drizzle-orm';
import { WEBHOOK_QUEUE } from '../queue.module';
import { WebhookService } from '../../channels/webhook/webhook.service';
import { MetricsService } from '../../metrics/metrics.service';
import { DATABASE_CONNECTION } from '../../db/database.module';
import { notifications, deliveryLogs, type NewDeliveryLog } from '../../db/schema';
export interface WebhookJob {
notificationId: string;
url: string;
method?: 'POST' | 'PUT';
headers?: Record<string, string>;
body: Record<string, unknown>;
timeout?: number;
appId: string;
}
@Processor(WEBHOOK_QUEUE, {
concurrency: 10,
})
export class WebhookProcessor extends WorkerHost {
private readonly logger = new Logger(WebhookProcessor.name);
constructor(
private readonly webhookService: WebhookService,
private readonly metricsService: MetricsService,
@Inject(DATABASE_CONNECTION) private readonly db: any
) {
super();
}
async process(job: Job<WebhookJob>): Promise<void> {
const { notificationId, url, method, headers, body, timeout, appId } = job.data;
const startTime = Date.now();
this.logger.debug(`Processing webhook job ${job.id} to ${url}`);
// Update notification status to processing
await this.updateNotificationStatus(notificationId, 'processing');
const result = await this.webhookService.send({
url,
method,
headers,
body,
timeout,
});
const durationMs = Date.now() - startTime;
// Log the delivery attempt
await this.logDelivery({
notificationId,
attemptNumber: job.attemptsMade + 1,
channel: 'webhook',
success: result.success,
statusCode: result.statusCode,
errorMessage: result.error,
durationMs: result.durationMs,
});
this.metricsService.recordWebhookSent(result.success);
this.metricsService.recordNotificationLatency('webhook', durationMs / 1000);
if (result.success) {
this.metricsService.recordNotificationSent('webhook', appId);
await this.updateNotificationStatus(notificationId, 'delivered');
this.logger.log(`Webhook sent to ${url} in ${durationMs}ms`);
} else {
this.metricsService.recordNotificationFailed('webhook', appId, 'send_error');
// Only mark as failed if no more retries
if (job.attemptsMade >= (job.opts.attempts || 5) - 1) {
await this.updateNotificationStatus(notificationId, 'failed', undefined, result.error);
}
throw new Error(result.error || 'Failed to send webhook');
}
}
@OnWorkerEvent('failed')
onFailed(job: Job<WebhookJob>, error: Error) {
this.logger.error(`Webhook job ${job.id} failed: ${error.message}`);
}
private async updateNotificationStatus(
notificationId: string,
status: string,
providerId?: string,
errorMessage?: string
): Promise<void> {
try {
const updateData: Record<string, unknown> = {
status,
updatedAt: new Date(),
};
if (status === 'delivered') {
updateData.deliveredAt = new Date();
}
if (errorMessage) {
updateData.errorMessage = errorMessage;
}
await this.db
.update(notifications)
.set(updateData)
.where(eq(notifications.id, notificationId));
} catch (error) {
this.logger.error(`Failed to update notification status: ${error}`);
}
}
private async logDelivery(log: Omit<NewDeliveryLog, 'id' | 'createdAt'>): Promise<void> {
try {
await this.db.insert(deliveryLogs).values(log);
} catch (error) {
this.logger.error(`Failed to log delivery: ${error}`);
}
}
}

View file

@ -0,0 +1,73 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EmailProcessor } from './processors/email.processor';
import { PushProcessor } from './processors/push.processor';
import { MatrixProcessor } from './processors/matrix.processor';
import { WebhookProcessor } from './processors/webhook.processor';
import { ChannelsModule } from '../channels/channels.module';
import { MetricsModule } from '../metrics/metrics.module';
export const EMAIL_QUEUE = 'email';
export const PUSH_QUEUE = 'push';
export const MATRIX_QUEUE = 'matrix';
export const WEBHOOK_QUEUE = 'webhook';
@Module({
imports: [
BullModule.registerQueue(
{
name: EMAIL_QUEUE,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
removeOnComplete: 100,
removeOnFail: 1000,
},
},
{
name: PUSH_QUEUE,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 100,
removeOnFail: 1000,
},
},
{
name: MATRIX_QUEUE,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 100,
removeOnFail: 500,
},
},
{
name: WEBHOOK_QUEUE,
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 3000,
},
removeOnComplete: 100,
removeOnFail: 1000,
},
}
),
ChannelsModule,
MetricsModule,
],
providers: [EmailProcessor, PushProcessor, MatrixProcessor, WebhookProcessor],
exports: [BullModule],
})
export class QueueModule {}