diff --git a/packages/shared-storage/src/client.spec.ts b/packages/shared-storage/src/client.spec.ts index f9934a130..c5e2601ac 100644 --- a/packages/shared-storage/src/client.spec.ts +++ b/packages/shared-storage/src/client.spec.ts @@ -31,6 +31,18 @@ vi.mock('@aws-sdk/client-s3', () => ({ HeadObjectCommand: vi.fn(function (this: any, input: any) { Object.assign(this, input); }), + CreateMultipartUploadCommand: vi.fn(function (this: any, input: any) { + Object.assign(this, input); + }), + UploadPartCommand: vi.fn(function (this: any, input: any) { + Object.assign(this, input); + }), + CompleteMultipartUploadCommand: vi.fn(function (this: any, input: any) { + Object.assign(this, input); + }), + AbortMultipartUploadCommand: vi.fn(function (this: any, input: any) { + Object.assign(this, input); + }), })); vi.mock('@aws-sdk/lib-storage', () => ({ @@ -345,4 +357,121 @@ describe('StorageClient', () => { expect(url).toBe('https://signed.url/file'); }); }); + + describe('hooks', () => { + it('emits upload event on successful upload', async () => { + mockSend.mockResolvedValue({ ETag: '"abc"' }); + const handler = vi.fn(); + storage.hooks.on('upload', handler); + + await storage.upload('file.png', Buffer.from('data'), { contentType: 'image/png' }); + + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + bucket: 'test-bucket', + key: 'file.png', + contentType: 'image/png', + sizeBytes: 4, + }) + ); + }); + + it('emits upload:error on failed upload', async () => { + mockSend.mockRejectedValue(new Error('S3 down')); + const handler = vi.fn(); + storage.hooks.on('upload:error', handler); + + await expect(storage.upload('file.png', Buffer.from('x'))).rejects.toThrow('S3 down'); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + bucket: 'test-bucket', + key: 'file.png', + }) + ); + }); + + it('emits delete event', async () => { + mockSend.mockResolvedValue({}); + const handler = vi.fn(); + storage.hooks.on('delete', handler); + + await storage.delete('file.png'); + + expect(handler).toHaveBeenCalledWith({ + bucket: 'test-bucket', + keys: ['file.png'], + }); + }); + + it('emits download event', async () => { + mockSend.mockResolvedValue({ + Body: (async function* () { + yield new Uint8Array([1]); + })(), + }); + const handler = vi.fn(); + storage.hooks.on('download', handler); + + await storage.download('file.bin'); + + expect(handler).toHaveBeenCalledWith({ bucket: 'test-bucket', key: 'file.bin' }); + }); + }); + + describe('presigned multipart upload', () => { + it('createMultipartUpload returns upload ID', async () => { + mockSend.mockResolvedValue({ UploadId: 'mp-123' }); + + const result = await storage.createMultipartUpload('big.zip', 'application/zip'); + + expect(result).toEqual({ uploadId: 'mp-123', key: 'big.zip' }); + }); + + it('createMultipartUpload throws when no UploadId', async () => { + mockSend.mockResolvedValue({}); + + await expect(storage.createMultipartUpload('big.zip')).rejects.toThrow( + 'no UploadId returned' + ); + }); + + it('getMultipartUploadUrls returns URLs for each part', async () => { + const urls = await storage.getMultipartUploadUrls('big.zip', 'mp-123', 3); + + expect(urls).toHaveLength(3); + expect(urls[0]).toBe('https://signed.url/file'); + }); + + it('completeMultipartUpload finishes upload and emits hook', async () => { + mockSend.mockResolvedValue({ ETag: '"final"' }); + const handler = vi.fn(); + storage.hooks.on('upload', handler); + + const result = await storage.completeMultipartUpload('big.zip', 'mp-123', [ + { partNumber: 1, etag: '"part1"' }, + { partNumber: 2, etag: '"part2"' }, + ]); + + expect(result.key).toBe('big.zip'); + expect(result.etag).toBe('"final"'); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ bucket: 'test-bucket', key: 'big.zip' }) + ); + }); + + it('abortMultipartUpload sends abort command', async () => { + mockSend.mockResolvedValue({}); + const { AbortMultipartUploadCommand } = await import('@aws-sdk/client-s3'); + + await storage.abortMultipartUpload('big.zip', 'mp-123'); + + expect(AbortMultipartUploadCommand).toHaveBeenCalledWith( + expect.objectContaining({ + Bucket: 'test-bucket', + Key: 'big.zip', + UploadId: 'mp-123', + }) + ); + }); + }); }); diff --git a/packages/shared-storage/src/client.ts b/packages/shared-storage/src/client.ts index 39ae98513..6ac86113e 100644 --- a/packages/shared-storage/src/client.ts +++ b/packages/shared-storage/src/client.ts @@ -6,15 +6,22 @@ import { DeleteObjectsCommand, ListObjectsV2Command, HeadObjectCommand, + CreateMultipartUploadCommand, + UploadPartCommand, + CompleteMultipartUploadCommand, + AbortMultipartUploadCommand, type PutObjectCommandInput, } from '@aws-sdk/client-s3'; import { Upload } from '@aws-sdk/lib-storage'; import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; +import { StorageHooks } from './hooks'; import type { StorageConfig, BucketConfig, UploadOptions, PresignedUrlOptions, + MultipartUploadInit, + MultipartUploadPart, UploadResult, FileInfo, } from './types'; @@ -26,8 +33,10 @@ export class StorageClient { private client: S3Client; private presignClient: S3Client; private bucket: BucketConfig; + readonly hooks: StorageHooks; constructor(config: StorageConfig, bucket: BucketConfig) { + this.hooks = new StorageHooks(); // Main client for internal operations (upload, download, delete, etc.) this.client = new S3Client({ endpoint: config.endpoint, @@ -86,13 +95,27 @@ export class StorageClient { } const command = new PutObjectCommand(input); - const result = await this.client.send(command); - - return { - key, - url: this.getPublicUrl(key), - etag: result.ETag, - }; + try { + const result = await this.client.send(command); + const uploadResult: UploadResult = { key, url: this.getPublicUrl(key), etag: result.ETag }; + const sizeBytes = + typeof body !== 'string' && !(body instanceof ReadableStream) ? body.byteLength : undefined; + this.hooks.emit('upload', { + bucket: this.bucket.name, + key, + sizeBytes, + contentType: options.contentType, + result: uploadResult, + }); + return uploadResult; + } catch (err) { + this.hooks.emit('upload:error', { + bucket: this.bucket.name, + key, + error: err instanceof Error ? err : new Error(String(err)), + }); + throw err; + } } /** @@ -114,28 +137,41 @@ export class StorageClient { } } - const upload = new Upload({ - client: this.client, - params: { - Bucket: this.bucket.name, - Key: key, - Body: body, - ContentType: options.contentType, - CacheControl: options.cacheControl, - Metadata: options.metadata, - ...(options.public ? { ACL: 'public-read' as const } : {}), - }, - queueSize: 4, - partSize: 10 * 1024 * 1024, // 10MB parts - }); + try { + const upload = new Upload({ + client: this.client, + params: { + Bucket: this.bucket.name, + Key: key, + Body: body, + ContentType: options.contentType, + CacheControl: options.cacheControl, + Metadata: options.metadata, + ...(options.public ? { ACL: 'public-read' as const } : {}), + }, + queueSize: 4, + partSize: 10 * 1024 * 1024, // 10MB parts + }); - const result = await upload.done(); - - return { - key, - url: this.getPublicUrl(key), - etag: result.ETag, - }; + const result = await upload.done(); + const uploadResult: UploadResult = { key, url: this.getPublicUrl(key), etag: result.ETag }; + const sizeBytes = !(body instanceof ReadableStream) ? body.byteLength : undefined; + this.hooks.emit('upload', { + bucket: this.bucket.name, + key, + sizeBytes, + contentType: options.contentType, + result: uploadResult, + }); + return uploadResult; + } catch (err) { + this.hooks.emit('upload:error', { + bucket: this.bucket.name, + key, + error: err instanceof Error ? err : new Error(String(err)), + }); + throw err; + } } /** @@ -153,6 +189,8 @@ export class StorageClient { throw new Error(`File not found: ${key}`); } + this.hooks.emit('download', { bucket: this.bucket.name, key }); + // Convert stream to buffer const chunks: Uint8Array[] = []; const stream = response.Body as AsyncIterable; @@ -190,6 +228,7 @@ export class StorageClient { }); await this.client.send(command); + this.hooks.emit('delete', { bucket: this.bucket.name, keys: [key] }); } /** @@ -211,6 +250,7 @@ export class StorageClient { }); await this.client.send(command); } + this.hooks.emit('delete', { bucket: this.bucket.name, keys }); } /** @@ -329,4 +369,108 @@ export class StorageClient { getBucketName(): string { return this.bucket.name; } + + // ── Presigned Multipart Upload (browser direct-upload) ────────────── + + /** + * Initiate a multipart upload and return the upload ID. + * The browser uses this ID to upload parts directly via presigned URLs. + */ + async createMultipartUpload(key: string, contentType?: string): Promise { + const command = new CreateMultipartUploadCommand({ + Bucket: this.bucket.name, + Key: key, + ContentType: contentType, + }); + + const response = await this.client.send(command); + if (!response.UploadId) { + throw new Error('Failed to create multipart upload — no UploadId returned'); + } + + return { uploadId: response.UploadId, key }; + } + + /** + * Generate presigned URLs for each part of a multipart upload. + * The browser PUTs each chunk to the corresponding URL. + * + * @param key - Object key + * @param uploadId - From createMultipartUpload() + * @param parts - Number of parts to generate URLs for + * @param expiresIn - URL expiration in seconds (default: 3600) + */ + async getMultipartUploadUrls( + key: string, + uploadId: string, + parts: number, + expiresIn = 3600 + ): Promise { + const urls: string[] = []; + + for (let partNumber = 1; partNumber <= parts; partNumber++) { + const command = new UploadPartCommand({ + Bucket: this.bucket.name, + Key: key, + UploadId: uploadId, + PartNumber: partNumber, + }); + + const url = await getSignedUrl(this.presignClient, command, { expiresIn }); + urls.push(url); + } + + return urls; + } + + /** + * Complete a multipart upload after all parts have been uploaded. + * The browser sends the ETag of each part (from the PUT response headers). + */ + async completeMultipartUpload( + key: string, + uploadId: string, + parts: MultipartUploadPart[] + ): Promise { + const command = new CompleteMultipartUploadCommand({ + Bucket: this.bucket.name, + Key: key, + UploadId: uploadId, + MultipartUpload: { + Parts: parts.map((p) => ({ + PartNumber: p.partNumber, + ETag: p.etag, + })), + }, + }); + + const result = await this.client.send(command); + + const uploadResult: UploadResult = { + key, + url: this.getPublicUrl(key), + etag: result.ETag, + }; + + this.hooks.emit('upload', { + bucket: this.bucket.name, + key, + result: uploadResult, + }); + + return uploadResult; + } + + /** + * Abort a multipart upload (cleanup if the browser abandons the upload). + */ + async abortMultipartUpload(key: string, uploadId: string): Promise { + const command = new AbortMultipartUploadCommand({ + Bucket: this.bucket.name, + Key: key, + UploadId: uploadId, + }); + + await this.client.send(command); + } } diff --git a/packages/shared-storage/src/hooks.spec.ts b/packages/shared-storage/src/hooks.spec.ts new file mode 100644 index 000000000..09115f8c1 --- /dev/null +++ b/packages/shared-storage/src/hooks.spec.ts @@ -0,0 +1,83 @@ +import { describe, it, expect, vi } from 'vitest'; +import { StorageHooks } from './hooks'; + +describe('StorageHooks', () => { + it('calls registered hook on emit', () => { + const hooks = new StorageHooks(); + const handler = vi.fn(); + + hooks.on('upload', handler); + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(handler).toHaveBeenCalledWith({ bucket: 'test', key: 'file.png' }); + }); + + it('supports multiple hooks for same event', () => { + const hooks = new StorageHooks(); + const h1 = vi.fn(); + const h2 = vi.fn(); + + hooks.on('upload', h1); + hooks.on('upload', h2); + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(h1).toHaveBeenCalled(); + expect(h2).toHaveBeenCalled(); + }); + + it('returns unsubscribe function', () => { + const hooks = new StorageHooks(); + const handler = vi.fn(); + + const unsub = hooks.on('upload', handler); + unsub(); + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('does not throw when emitting with no listeners', () => { + const hooks = new StorageHooks(); + expect(() => hooks.emit('upload', { bucket: 'test', key: 'file.png' })).not.toThrow(); + }); + + it('swallows errors in hooks', () => { + const hooks = new StorageHooks(); + const good = vi.fn(); + + hooks.on('upload', () => { + throw new Error('hook error'); + }); + hooks.on('upload', good); + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(good).toHaveBeenCalled(); + }); + + it('removeAll clears all listeners', () => { + const hooks = new StorageHooks(); + const handler = vi.fn(); + + hooks.on('upload', handler); + hooks.on('delete', handler); + hooks.removeAll(); + + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + hooks.emit('delete', { bucket: 'test', keys: ['a'] }); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('emits different event types independently', () => { + const hooks = new StorageHooks(); + const uploadHandler = vi.fn(); + const deleteHandler = vi.fn(); + + hooks.on('upload', uploadHandler); + hooks.on('delete', deleteHandler); + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(uploadHandler).toHaveBeenCalled(); + expect(deleteHandler).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/shared-storage/src/hooks.ts b/packages/shared-storage/src/hooks.ts new file mode 100644 index 000000000..f2fcf6d2f --- /dev/null +++ b/packages/shared-storage/src/hooks.ts @@ -0,0 +1,91 @@ +import type { UploadResult } from './types'; + +/** + * Storage event types + */ +export type StorageEventType = 'upload' | 'upload:error' | 'delete' | 'delete:error' | 'download'; + +/** + * Payload for upload events + */ +export interface UploadEventPayload { + bucket: string; + key: string; + sizeBytes?: number; + contentType?: string; + result?: UploadResult; +} + +/** + * Payload for delete events + */ +export interface DeleteEventPayload { + bucket: string; + keys: string[]; +} + +/** + * Payload for download events + */ +export interface DownloadEventPayload { + bucket: string; + key: string; +} + +/** + * Payload for error events + */ +export interface ErrorEventPayload { + bucket: string; + key?: string; + error: Error; +} + +/** + * Event payload map + */ +export interface StorageEventMap { + upload: UploadEventPayload; + 'upload:error': ErrorEventPayload; + delete: DeleteEventPayload; + 'delete:error': ErrorEventPayload; + download: DownloadEventPayload; +} + +export type StorageHook = (payload: StorageEventMap[T]) => void; + +/** + * Simple event emitter for storage lifecycle hooks. + * Hooks are fire-and-forget — errors in hooks do not affect storage operations. + */ +export class StorageHooks { + private listeners = new Map>>(); + + on(event: T, hook: StorageHook): () => void { + if (!this.listeners.has(event)) { + this.listeners.set(event, new Set()); + } + const set = this.listeners.get(event) as Set>; + set.add(hook); + + // Return unsubscribe function + return () => set.delete(hook); + } + + emit(event: T, payload: StorageEventMap[T]): void { + const hooks = this.listeners.get(event); + if (!hooks) return; + + for (const hook of hooks) { + try { + hook(payload); + } catch { + // Hooks are fire-and-forget — swallow errors + } + } + } + + removeAll(): void { + this.listeners.clear(); + } +} diff --git a/packages/shared-storage/src/index.ts b/packages/shared-storage/src/index.ts index ee1bf2d12..a110e9c0a 100644 --- a/packages/shared-storage/src/index.ts +++ b/packages/shared-storage/src/index.ts @@ -21,6 +21,22 @@ export { createProjectDocStorage, } from './factory'; +// Hooks +export { StorageHooks } from './hooks'; +export type { + StorageEventType, + StorageEventMap, + StorageHook, + UploadEventPayload, + DeleteEventPayload, + DownloadEventPayload, + ErrorEventPayload, +} from './hooks'; + +// Metrics +export { InMemoryMetrics, attachMetrics } from './metrics'; +export type { StorageMetricsCollector } from './metrics'; + // Utilities export { generateFileKey, @@ -42,6 +58,8 @@ export { type BucketName, type UploadOptions, type PresignedUrlOptions, + type MultipartUploadInit, + type MultipartUploadPart, type UploadResult, type FileInfo, } from './types'; diff --git a/packages/shared-storage/src/metrics.spec.ts b/packages/shared-storage/src/metrics.spec.ts new file mode 100644 index 000000000..8d3d02dfa --- /dev/null +++ b/packages/shared-storage/src/metrics.spec.ts @@ -0,0 +1,113 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { StorageHooks } from './hooks'; +import { InMemoryMetrics, attachMetrics } from './metrics'; + +describe('InMemoryMetrics', () => { + let metrics: InMemoryMetrics; + + beforeEach(() => { + metrics = new InMemoryMetrics(); + }); + + it('tracks upload count', () => { + metrics.incrementUploads('test-bucket'); + metrics.incrementUploads('test-bucket'); + expect(metrics.counters.uploads).toBe(2); + }); + + it('tracks upload errors', () => { + metrics.incrementUploadErrors('test-bucket'); + expect(metrics.counters.uploadErrors).toBe(1); + }); + + it('tracks deletes with count', () => { + metrics.incrementDeletes('test-bucket', 5); + expect(metrics.counters.deletes).toBe(5); + }); + + it('tracks downloads', () => { + metrics.incrementDownloads('test-bucket'); + expect(metrics.counters.downloads).toBe(1); + }); + + it('records upload sizes', () => { + metrics.observeUploadSize('test-bucket', 1024); + metrics.observeUploadSize('test-bucket', 2048); + expect(metrics.sizes).toEqual([1024, 2048]); + }); + + it('resets all counters', () => { + metrics.incrementUploads('b'); + metrics.incrementUploadErrors('b'); + metrics.incrementDeletes('b', 3); + metrics.incrementDownloads('b'); + metrics.observeUploadSize('b', 100); + metrics.reset(); + + expect(metrics.counters.uploads).toBe(0); + expect(metrics.counters.uploadErrors).toBe(0); + expect(metrics.counters.deletes).toBe(0); + expect(metrics.counters.downloads).toBe(0); + expect(metrics.sizes).toEqual([]); + }); +}); + +describe('attachMetrics', () => { + let hooks: StorageHooks; + let metrics: InMemoryMetrics; + + beforeEach(() => { + hooks = new StorageHooks(); + metrics = new InMemoryMetrics(); + attachMetrics(hooks, metrics); + }); + + it('tracks uploads via hooks', () => { + hooks.emit('upload', { + bucket: 'test', + key: 'file.png', + sizeBytes: 512, + contentType: 'image/png', + }); + + expect(metrics.counters.uploads).toBe(1); + expect(metrics.sizes).toEqual([512]); + }); + + it('tracks upload errors via hooks', () => { + hooks.emit('upload:error', { + bucket: 'test', + error: new Error('fail'), + }); + + expect(metrics.counters.uploadErrors).toBe(1); + }); + + it('tracks deletes via hooks', () => { + hooks.emit('delete', { + bucket: 'test', + keys: ['a.png', 'b.png'], + }); + + expect(metrics.counters.deletes).toBe(2); + }); + + it('tracks downloads via hooks', () => { + hooks.emit('download', { bucket: 'test', key: 'file.png' }); + + expect(metrics.counters.downloads).toBe(1); + }); + + it('returns detach function', () => { + const detach = attachMetrics(new StorageHooks(), metrics); + // No-op on hooks that were attached separately — just verify it doesn't throw + detach(); + }); + + it('skips size tracking when sizeBytes is undefined', () => { + hooks.emit('upload', { bucket: 'test', key: 'file.png' }); + + expect(metrics.counters.uploads).toBe(1); + expect(metrics.sizes).toEqual([]); + }); +}); diff --git a/packages/shared-storage/src/metrics.ts b/packages/shared-storage/src/metrics.ts new file mode 100644 index 000000000..5844f656b --- /dev/null +++ b/packages/shared-storage/src/metrics.ts @@ -0,0 +1,82 @@ +import type { StorageHooks } from './hooks'; + +/** + * Interface for a metrics collector — decoupled from prom-client so shared-storage + * stays dependency-free. NestJS backends wire this up with their MetricsService. + */ +export interface StorageMetricsCollector { + incrementUploads(bucket: string, contentType?: string): void; + incrementUploadErrors(bucket: string): void; + incrementDeletes(bucket: string, count: number): void; + incrementDownloads(bucket: string): void; + observeUploadSize(bucket: string, sizeBytes: number): void; +} + +/** + * In-memory metrics collector for environments without Prometheus. + * Useful for testing and local development. + */ +export class InMemoryMetrics implements StorageMetricsCollector { + readonly counters = { + uploads: 0, + uploadErrors: 0, + deletes: 0, + downloads: 0, + }; + readonly sizes: number[] = []; + + incrementUploads(_bucket: string, _contentType?: string): void { + this.counters.uploads++; + } + incrementUploadErrors(_bucket: string): void { + this.counters.uploadErrors++; + } + incrementDeletes(_bucket: string, count: number): void { + this.counters.deletes += count; + } + incrementDownloads(_bucket: string): void { + this.counters.downloads++; + } + observeUploadSize(_bucket: string, sizeBytes: number): void { + this.sizes.push(sizeBytes); + } + + reset(): void { + this.counters.uploads = 0; + this.counters.uploadErrors = 0; + this.counters.deletes = 0; + this.counters.downloads = 0; + this.sizes.length = 0; + } +} + +/** + * Wires a StorageMetricsCollector to StorageHooks. + * Call this once after creating the hooks + collector to auto-track metrics. + * + * @example + * const hooks = new StorageHooks(); + * const collector = createPrometheusCollector(metricsService); + * attachMetrics(hooks, collector); + */ +export function attachMetrics(hooks: StorageHooks, collector: StorageMetricsCollector): () => void { + const unsubs = [ + hooks.on('upload', (payload) => { + collector.incrementUploads(payload.bucket, payload.contentType); + if (payload.sizeBytes != null) { + collector.observeUploadSize(payload.bucket, payload.sizeBytes); + } + }), + hooks.on('upload:error', (payload) => { + collector.incrementUploadErrors(payload.bucket); + }), + hooks.on('delete', (payload) => { + collector.incrementDeletes(payload.bucket, payload.keys.length); + }), + hooks.on('download', (payload) => { + collector.incrementDownloads(payload.bucket); + }), + ]; + + return () => unsubs.forEach((unsub) => unsub()); +} diff --git a/packages/shared-storage/src/types.ts b/packages/shared-storage/src/types.ts index 9c049f8fe..9bc48a346 100644 --- a/packages/shared-storage/src/types.ts +++ b/packages/shared-storage/src/types.ts @@ -86,6 +86,26 @@ export interface FileInfo { etag?: string; } +/** + * Multipart upload initialization result + */ +export interface MultipartUploadInit { + /** S3 upload ID for this multipart upload session */ + uploadId: string; + /** Object key */ + key: string; +} + +/** + * A completed part of a multipart upload + */ +export interface MultipartUploadPart { + /** 1-based part number */ + partNumber: number; + /** ETag returned by S3 after uploading the part */ + etag: string; +} + /** * Predefined bucket names for each project */