feat(storage): add upload hooks, metrics integration, and presigned multipart

Upload hooks:
- StorageHooks class with fire-and-forget event emitter pattern
- Events: upload, upload:error, delete, delete:error, download
- All StorageClient operations now emit appropriate events
- Unsubscribe functions for cleanup

Metrics:
- StorageMetricsCollector interface (decoupled from prom-client)
- InMemoryMetrics for testing and local dev
- attachMetrics() wires hooks to any collector automatically
- Backends can create a Prometheus collector via MetricsService

Presigned multipart upload (browser direct-upload):
- createMultipartUpload() initiates and returns uploadId
- getMultipartUploadUrls() generates presigned PUT URLs per part
- completeMultipartUpload() finalizes with part ETags
- abortMultipartUpload() for cleanup on abandoned uploads

90 tests passing across 5 test files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-03-20 19:36:46 +01:00
parent 822e75368a
commit b0e5a9c5ff
8 changed files with 708 additions and 28 deletions

View file

@ -31,6 +31,18 @@ vi.mock('@aws-sdk/client-s3', () => ({
HeadObjectCommand: vi.fn(function (this: any, input: any) {
Object.assign(this, input);
}),
CreateMultipartUploadCommand: vi.fn(function (this: any, input: any) {
Object.assign(this, input);
}),
UploadPartCommand: vi.fn(function (this: any, input: any) {
Object.assign(this, input);
}),
CompleteMultipartUploadCommand: vi.fn(function (this: any, input: any) {
Object.assign(this, input);
}),
AbortMultipartUploadCommand: vi.fn(function (this: any, input: any) {
Object.assign(this, input);
}),
}));
vi.mock('@aws-sdk/lib-storage', () => ({
@ -345,4 +357,121 @@ describe('StorageClient', () => {
expect(url).toBe('https://signed.url/file');
});
});
describe('hooks', () => {
it('emits upload event on successful upload', async () => {
mockSend.mockResolvedValue({ ETag: '"abc"' });
const handler = vi.fn();
storage.hooks.on('upload', handler);
await storage.upload('file.png', Buffer.from('data'), { contentType: 'image/png' });
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
bucket: 'test-bucket',
key: 'file.png',
contentType: 'image/png',
sizeBytes: 4,
})
);
});
it('emits upload:error on failed upload', async () => {
mockSend.mockRejectedValue(new Error('S3 down'));
const handler = vi.fn();
storage.hooks.on('upload:error', handler);
await expect(storage.upload('file.png', Buffer.from('x'))).rejects.toThrow('S3 down');
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
bucket: 'test-bucket',
key: 'file.png',
})
);
});
it('emits delete event', async () => {
mockSend.mockResolvedValue({});
const handler = vi.fn();
storage.hooks.on('delete', handler);
await storage.delete('file.png');
expect(handler).toHaveBeenCalledWith({
bucket: 'test-bucket',
keys: ['file.png'],
});
});
it('emits download event', async () => {
mockSend.mockResolvedValue({
Body: (async function* () {
yield new Uint8Array([1]);
})(),
});
const handler = vi.fn();
storage.hooks.on('download', handler);
await storage.download('file.bin');
expect(handler).toHaveBeenCalledWith({ bucket: 'test-bucket', key: 'file.bin' });
});
});
describe('presigned multipart upload', () => {
it('createMultipartUpload returns upload ID', async () => {
mockSend.mockResolvedValue({ UploadId: 'mp-123' });
const result = await storage.createMultipartUpload('big.zip', 'application/zip');
expect(result).toEqual({ uploadId: 'mp-123', key: 'big.zip' });
});
it('createMultipartUpload throws when no UploadId', async () => {
mockSend.mockResolvedValue({});
await expect(storage.createMultipartUpload('big.zip')).rejects.toThrow(
'no UploadId returned'
);
});
it('getMultipartUploadUrls returns URLs for each part', async () => {
const urls = await storage.getMultipartUploadUrls('big.zip', 'mp-123', 3);
expect(urls).toHaveLength(3);
expect(urls[0]).toBe('https://signed.url/file');
});
it('completeMultipartUpload finishes upload and emits hook', async () => {
mockSend.mockResolvedValue({ ETag: '"final"' });
const handler = vi.fn();
storage.hooks.on('upload', handler);
const result = await storage.completeMultipartUpload('big.zip', 'mp-123', [
{ partNumber: 1, etag: '"part1"' },
{ partNumber: 2, etag: '"part2"' },
]);
expect(result.key).toBe('big.zip');
expect(result.etag).toBe('"final"');
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({ bucket: 'test-bucket', key: 'big.zip' })
);
});
it('abortMultipartUpload sends abort command', async () => {
mockSend.mockResolvedValue({});
const { AbortMultipartUploadCommand } = await import('@aws-sdk/client-s3');
await storage.abortMultipartUpload('big.zip', 'mp-123');
expect(AbortMultipartUploadCommand).toHaveBeenCalledWith(
expect.objectContaining({
Bucket: 'test-bucket',
Key: 'big.zip',
UploadId: 'mp-123',
})
);
});
});
});

View file

@ -6,15 +6,22 @@ import {
DeleteObjectsCommand,
ListObjectsV2Command,
HeadObjectCommand,
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
type PutObjectCommandInput,
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { StorageHooks } from './hooks';
import type {
StorageConfig,
BucketConfig,
UploadOptions,
PresignedUrlOptions,
MultipartUploadInit,
MultipartUploadPart,
UploadResult,
FileInfo,
} from './types';
@ -26,8 +33,10 @@ export class StorageClient {
private client: S3Client;
private presignClient: S3Client;
private bucket: BucketConfig;
readonly hooks: StorageHooks;
constructor(config: StorageConfig, bucket: BucketConfig) {
this.hooks = new StorageHooks();
// Main client for internal operations (upload, download, delete, etc.)
this.client = new S3Client({
endpoint: config.endpoint,
@ -86,13 +95,27 @@ export class StorageClient {
}
const command = new PutObjectCommand(input);
const result = await this.client.send(command);
return {
key,
url: this.getPublicUrl(key),
etag: result.ETag,
};
try {
const result = await this.client.send(command);
const uploadResult: UploadResult = { key, url: this.getPublicUrl(key), etag: result.ETag };
const sizeBytes =
typeof body !== 'string' && !(body instanceof ReadableStream) ? body.byteLength : undefined;
this.hooks.emit('upload', {
bucket: this.bucket.name,
key,
sizeBytes,
contentType: options.contentType,
result: uploadResult,
});
return uploadResult;
} catch (err) {
this.hooks.emit('upload:error', {
bucket: this.bucket.name,
key,
error: err instanceof Error ? err : new Error(String(err)),
});
throw err;
}
}
/**
@ -114,28 +137,41 @@ export class StorageClient {
}
}
const upload = new Upload({
client: this.client,
params: {
Bucket: this.bucket.name,
Key: key,
Body: body,
ContentType: options.contentType,
CacheControl: options.cacheControl,
Metadata: options.metadata,
...(options.public ? { ACL: 'public-read' as const } : {}),
},
queueSize: 4,
partSize: 10 * 1024 * 1024, // 10MB parts
});
try {
const upload = new Upload({
client: this.client,
params: {
Bucket: this.bucket.name,
Key: key,
Body: body,
ContentType: options.contentType,
CacheControl: options.cacheControl,
Metadata: options.metadata,
...(options.public ? { ACL: 'public-read' as const } : {}),
},
queueSize: 4,
partSize: 10 * 1024 * 1024, // 10MB parts
});
const result = await upload.done();
return {
key,
url: this.getPublicUrl(key),
etag: result.ETag,
};
const result = await upload.done();
const uploadResult: UploadResult = { key, url: this.getPublicUrl(key), etag: result.ETag };
const sizeBytes = !(body instanceof ReadableStream) ? body.byteLength : undefined;
this.hooks.emit('upload', {
bucket: this.bucket.name,
key,
sizeBytes,
contentType: options.contentType,
result: uploadResult,
});
return uploadResult;
} catch (err) {
this.hooks.emit('upload:error', {
bucket: this.bucket.name,
key,
error: err instanceof Error ? err : new Error(String(err)),
});
throw err;
}
}
/**
@ -153,6 +189,8 @@ export class StorageClient {
throw new Error(`File not found: ${key}`);
}
this.hooks.emit('download', { bucket: this.bucket.name, key });
// Convert stream to buffer
const chunks: Uint8Array[] = [];
const stream = response.Body as AsyncIterable<Uint8Array>;
@ -190,6 +228,7 @@ export class StorageClient {
});
await this.client.send(command);
this.hooks.emit('delete', { bucket: this.bucket.name, keys: [key] });
}
/**
@ -211,6 +250,7 @@ export class StorageClient {
});
await this.client.send(command);
}
this.hooks.emit('delete', { bucket: this.bucket.name, keys });
}
/**
@ -329,4 +369,108 @@ export class StorageClient {
getBucketName(): string {
return this.bucket.name;
}
// ── Presigned Multipart Upload (browser direct-upload) ──────────────
/**
* Initiate a multipart upload and return the upload ID.
* The browser uses this ID to upload parts directly via presigned URLs.
*/
async createMultipartUpload(key: string, contentType?: string): Promise<MultipartUploadInit> {
const command = new CreateMultipartUploadCommand({
Bucket: this.bucket.name,
Key: key,
ContentType: contentType,
});
const response = await this.client.send(command);
if (!response.UploadId) {
throw new Error('Failed to create multipart upload — no UploadId returned');
}
return { uploadId: response.UploadId, key };
}
/**
* Generate presigned URLs for each part of a multipart upload.
* The browser PUTs each chunk to the corresponding URL.
*
* @param key - Object key
* @param uploadId - From createMultipartUpload()
* @param parts - Number of parts to generate URLs for
* @param expiresIn - URL expiration in seconds (default: 3600)
*/
async getMultipartUploadUrls(
key: string,
uploadId: string,
parts: number,
expiresIn = 3600
): Promise<string[]> {
const urls: string[] = [];
for (let partNumber = 1; partNumber <= parts; partNumber++) {
const command = new UploadPartCommand({
Bucket: this.bucket.name,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
});
const url = await getSignedUrl(this.presignClient, command, { expiresIn });
urls.push(url);
}
return urls;
}
/**
* Complete a multipart upload after all parts have been uploaded.
* The browser sends the ETag of each part (from the PUT response headers).
*/
async completeMultipartUpload(
key: string,
uploadId: string,
parts: MultipartUploadPart[]
): Promise<UploadResult> {
const command = new CompleteMultipartUploadCommand({
Bucket: this.bucket.name,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: parts.map((p) => ({
PartNumber: p.partNumber,
ETag: p.etag,
})),
},
});
const result = await this.client.send(command);
const uploadResult: UploadResult = {
key,
url: this.getPublicUrl(key),
etag: result.ETag,
};
this.hooks.emit('upload', {
bucket: this.bucket.name,
key,
result: uploadResult,
});
return uploadResult;
}
/**
* Abort a multipart upload (cleanup if the browser abandons the upload).
*/
async abortMultipartUpload(key: string, uploadId: string): Promise<void> {
const command = new AbortMultipartUploadCommand({
Bucket: this.bucket.name,
Key: key,
UploadId: uploadId,
});
await this.client.send(command);
}
}

View file

@ -0,0 +1,83 @@
import { describe, it, expect, vi } from 'vitest';
import { StorageHooks } from './hooks';
describe('StorageHooks', () => {
it('calls registered hook on emit', () => {
const hooks = new StorageHooks();
const handler = vi.fn();
hooks.on('upload', handler);
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(handler).toHaveBeenCalledWith({ bucket: 'test', key: 'file.png' });
});
it('supports multiple hooks for same event', () => {
const hooks = new StorageHooks();
const h1 = vi.fn();
const h2 = vi.fn();
hooks.on('upload', h1);
hooks.on('upload', h2);
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(h1).toHaveBeenCalled();
expect(h2).toHaveBeenCalled();
});
it('returns unsubscribe function', () => {
const hooks = new StorageHooks();
const handler = vi.fn();
const unsub = hooks.on('upload', handler);
unsub();
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(handler).not.toHaveBeenCalled();
});
it('does not throw when emitting with no listeners', () => {
const hooks = new StorageHooks();
expect(() => hooks.emit('upload', { bucket: 'test', key: 'file.png' })).not.toThrow();
});
it('swallows errors in hooks', () => {
const hooks = new StorageHooks();
const good = vi.fn();
hooks.on('upload', () => {
throw new Error('hook error');
});
hooks.on('upload', good);
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(good).toHaveBeenCalled();
});
it('removeAll clears all listeners', () => {
const hooks = new StorageHooks();
const handler = vi.fn();
hooks.on('upload', handler);
hooks.on('delete', handler);
hooks.removeAll();
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
hooks.emit('delete', { bucket: 'test', keys: ['a'] });
expect(handler).not.toHaveBeenCalled();
});
it('emits different event types independently', () => {
const hooks = new StorageHooks();
const uploadHandler = vi.fn();
const deleteHandler = vi.fn();
hooks.on('upload', uploadHandler);
hooks.on('delete', deleteHandler);
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(uploadHandler).toHaveBeenCalled();
expect(deleteHandler).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,91 @@
import type { UploadResult } from './types';
/**
* Storage event types
*/
export type StorageEventType = 'upload' | 'upload:error' | 'delete' | 'delete:error' | 'download';
/**
* Payload for upload events
*/
export interface UploadEventPayload {
bucket: string;
key: string;
sizeBytes?: number;
contentType?: string;
result?: UploadResult;
}
/**
* Payload for delete events
*/
export interface DeleteEventPayload {
bucket: string;
keys: string[];
}
/**
* Payload for download events
*/
export interface DownloadEventPayload {
bucket: string;
key: string;
}
/**
* Payload for error events
*/
export interface ErrorEventPayload {
bucket: string;
key?: string;
error: Error;
}
/**
* Event payload map
*/
export interface StorageEventMap {
upload: UploadEventPayload;
'upload:error': ErrorEventPayload;
delete: DeleteEventPayload;
'delete:error': ErrorEventPayload;
download: DownloadEventPayload;
}
export type StorageHook<T extends StorageEventType> = (payload: StorageEventMap[T]) => void;
/**
* Simple event emitter for storage lifecycle hooks.
* Hooks are fire-and-forget errors in hooks do not affect storage operations.
*/
export class StorageHooks {
private listeners = new Map<StorageEventType, Set<StorageHook<StorageEventType>>>();
on<T extends StorageEventType>(event: T, hook: StorageHook<T>): () => void {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set());
}
const set = this.listeners.get(event) as Set<StorageHook<T>>;
set.add(hook);
// Return unsubscribe function
return () => set.delete(hook);
}
emit<T extends StorageEventType>(event: T, payload: StorageEventMap[T]): void {
const hooks = this.listeners.get(event);
if (!hooks) return;
for (const hook of hooks) {
try {
hook(payload);
} catch {
// Hooks are fire-and-forget — swallow errors
}
}
}
removeAll(): void {
this.listeners.clear();
}
}

View file

@ -21,6 +21,22 @@ export {
createProjectDocStorage,
} from './factory';
// Hooks
export { StorageHooks } from './hooks';
export type {
StorageEventType,
StorageEventMap,
StorageHook,
UploadEventPayload,
DeleteEventPayload,
DownloadEventPayload,
ErrorEventPayload,
} from './hooks';
// Metrics
export { InMemoryMetrics, attachMetrics } from './metrics';
export type { StorageMetricsCollector } from './metrics';
// Utilities
export {
generateFileKey,
@ -42,6 +58,8 @@ export {
type BucketName,
type UploadOptions,
type PresignedUrlOptions,
type MultipartUploadInit,
type MultipartUploadPart,
type UploadResult,
type FileInfo,
} from './types';

View file

@ -0,0 +1,113 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { StorageHooks } from './hooks';
import { InMemoryMetrics, attachMetrics } from './metrics';
describe('InMemoryMetrics', () => {
let metrics: InMemoryMetrics;
beforeEach(() => {
metrics = new InMemoryMetrics();
});
it('tracks upload count', () => {
metrics.incrementUploads('test-bucket');
metrics.incrementUploads('test-bucket');
expect(metrics.counters.uploads).toBe(2);
});
it('tracks upload errors', () => {
metrics.incrementUploadErrors('test-bucket');
expect(metrics.counters.uploadErrors).toBe(1);
});
it('tracks deletes with count', () => {
metrics.incrementDeletes('test-bucket', 5);
expect(metrics.counters.deletes).toBe(5);
});
it('tracks downloads', () => {
metrics.incrementDownloads('test-bucket');
expect(metrics.counters.downloads).toBe(1);
});
it('records upload sizes', () => {
metrics.observeUploadSize('test-bucket', 1024);
metrics.observeUploadSize('test-bucket', 2048);
expect(metrics.sizes).toEqual([1024, 2048]);
});
it('resets all counters', () => {
metrics.incrementUploads('b');
metrics.incrementUploadErrors('b');
metrics.incrementDeletes('b', 3);
metrics.incrementDownloads('b');
metrics.observeUploadSize('b', 100);
metrics.reset();
expect(metrics.counters.uploads).toBe(0);
expect(metrics.counters.uploadErrors).toBe(0);
expect(metrics.counters.deletes).toBe(0);
expect(metrics.counters.downloads).toBe(0);
expect(metrics.sizes).toEqual([]);
});
});
describe('attachMetrics', () => {
let hooks: StorageHooks;
let metrics: InMemoryMetrics;
beforeEach(() => {
hooks = new StorageHooks();
metrics = new InMemoryMetrics();
attachMetrics(hooks, metrics);
});
it('tracks uploads via hooks', () => {
hooks.emit('upload', {
bucket: 'test',
key: 'file.png',
sizeBytes: 512,
contentType: 'image/png',
});
expect(metrics.counters.uploads).toBe(1);
expect(metrics.sizes).toEqual([512]);
});
it('tracks upload errors via hooks', () => {
hooks.emit('upload:error', {
bucket: 'test',
error: new Error('fail'),
});
expect(metrics.counters.uploadErrors).toBe(1);
});
it('tracks deletes via hooks', () => {
hooks.emit('delete', {
bucket: 'test',
keys: ['a.png', 'b.png'],
});
expect(metrics.counters.deletes).toBe(2);
});
it('tracks downloads via hooks', () => {
hooks.emit('download', { bucket: 'test', key: 'file.png' });
expect(metrics.counters.downloads).toBe(1);
});
it('returns detach function', () => {
const detach = attachMetrics(new StorageHooks(), metrics);
// No-op on hooks that were attached separately — just verify it doesn't throw
detach();
});
it('skips size tracking when sizeBytes is undefined', () => {
hooks.emit('upload', { bucket: 'test', key: 'file.png' });
expect(metrics.counters.uploads).toBe(1);
expect(metrics.sizes).toEqual([]);
});
});

View file

@ -0,0 +1,82 @@
import type { StorageHooks } from './hooks';
/**
* Interface for a metrics collector decoupled from prom-client so shared-storage
* stays dependency-free. NestJS backends wire this up with their MetricsService.
*/
export interface StorageMetricsCollector {
incrementUploads(bucket: string, contentType?: string): void;
incrementUploadErrors(bucket: string): void;
incrementDeletes(bucket: string, count: number): void;
incrementDownloads(bucket: string): void;
observeUploadSize(bucket: string, sizeBytes: number): void;
}
/**
* In-memory metrics collector for environments without Prometheus.
* Useful for testing and local development.
*/
export class InMemoryMetrics implements StorageMetricsCollector {
readonly counters = {
uploads: 0,
uploadErrors: 0,
deletes: 0,
downloads: 0,
};
readonly sizes: number[] = [];
incrementUploads(_bucket: string, _contentType?: string): void {
this.counters.uploads++;
}
incrementUploadErrors(_bucket: string): void {
this.counters.uploadErrors++;
}
incrementDeletes(_bucket: string, count: number): void {
this.counters.deletes += count;
}
incrementDownloads(_bucket: string): void {
this.counters.downloads++;
}
observeUploadSize(_bucket: string, sizeBytes: number): void {
this.sizes.push(sizeBytes);
}
reset(): void {
this.counters.uploads = 0;
this.counters.uploadErrors = 0;
this.counters.deletes = 0;
this.counters.downloads = 0;
this.sizes.length = 0;
}
}
/**
* Wires a StorageMetricsCollector to StorageHooks.
* Call this once after creating the hooks + collector to auto-track metrics.
*
* @example
* const hooks = new StorageHooks();
* const collector = createPrometheusCollector(metricsService);
* attachMetrics(hooks, collector);
*/
export function attachMetrics(hooks: StorageHooks, collector: StorageMetricsCollector): () => void {
const unsubs = [
hooks.on('upload', (payload) => {
collector.incrementUploads(payload.bucket, payload.contentType);
if (payload.sizeBytes != null) {
collector.observeUploadSize(payload.bucket, payload.sizeBytes);
}
}),
hooks.on('upload:error', (payload) => {
collector.incrementUploadErrors(payload.bucket);
}),
hooks.on('delete', (payload) => {
collector.incrementDeletes(payload.bucket, payload.keys.length);
}),
hooks.on('download', (payload) => {
collector.incrementDownloads(payload.bucket);
}),
];
return () => unsubs.forEach((unsub) => unsub());
}

View file

@ -86,6 +86,26 @@ export interface FileInfo {
etag?: string;
}
/**
* Multipart upload initialization result
*/
export interface MultipartUploadInit {
/** S3 upload ID for this multipart upload session */
uploadId: string;
/** Object key */
key: string;
}
/**
* A completed part of a multipart upload
*/
export interface MultipartUploadPart {
/** 1-based part number */
partNumber: number;
/** ETag returned by S3 after uploading the part */
etag: string;
}
/**
* Predefined bucket names for each project
*/