From 4a3295d1d0dd0de9e3bf5d390f3f7c7b2d310269 Mon Sep 17 00:00:00 2001 From: Till-JS <101404291+Till-JS@users.noreply.github.com> Date: Thu, 29 Jan 2026 22:00:36 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(mana-crawler):=20add=20web=20c?= =?UTF-8?q?rawler=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NestJS-based web crawler service for structured content extraction. Features: - Depth-controlled crawling with URL pattern filtering - robots.txt compliance - HTML/PDF/Markdown content extraction - BullMQ job queue for async processing - Redis caching layer - Prometheus metrics --- services/mana-crawler/.env.example | 24 ++ services/mana-crawler/.gitignore | 20 + services/mana-crawler/CLAUDE.md | 297 +++++++++++++++ services/mana-crawler/Dockerfile | 55 +++ services/mana-crawler/drizzle.config.ts | 6 + services/mana-crawler/nest-cli.json | 8 + services/mana-crawler/package.json | 61 +++ services/mana-crawler/src/app.module.ts | 47 +++ .../mana-crawler/src/cache/cache.module.ts | 10 + .../mana-crawler/src/cache/cache.service.ts | 152 ++++++++ .../common/filters/http-exception.filter.ts | 51 +++ .../mana-crawler/src/config/configuration.ts | 49 +++ .../src/crawler/crawler.controller.ts | 74 ++++ .../src/crawler/crawler.module.ts | 13 + .../src/crawler/crawler.service.ts | 337 +++++++++++++++++ .../src/crawler/dto/crawl-response.dto.ts | 91 +++++ .../mana-crawler/src/crawler/dto/index.ts | 2 + .../src/crawler/dto/start-crawl.dto.ts | 96 +++++ services/mana-crawler/src/db/connection.ts | 33 ++ .../mana-crawler/src/db/database.module.ts | 24 ++ .../src/db/schema/crawl-jobs.schema.ts | 89 +++++ .../src/db/schema/crawl-results.schema.ts | 53 +++ services/mana-crawler/src/db/schema/index.ts | 2 + .../src/health/health.controller.ts | 48 +++ .../mana-crawler/src/health/health.module.ts | 9 + services/mana-crawler/src/main.ts | 43 +++ .../src/metrics/metrics.controller.ts | 13 + .../src/metrics/metrics.module.ts | 10 + .../src/metrics/metrics.service.ts | 93 +++++ .../mana-crawler/src/parser/parser.module.ts | 8 + .../mana-crawler/src/parser/parser.service.ts | 245 ++++++++++++ services/mana-crawler/src/queue/constants.ts | 1 + .../src/queue/processor.module.ts | 20 + .../src/queue/processors/crawl.processor.ts | 350 ++++++++++++++++++ .../mana-crawler/src/queue/queue.module.ts | 33 ++ .../mana-crawler/src/queue/queue.service.ts | 150 ++++++++ .../mana-crawler/src/robots/robots.module.ts | 10 + .../mana-crawler/src/robots/robots.service.ts | 143 +++++++ services/mana-crawler/tsconfig.json | 25 ++ 39 files changed, 2795 insertions(+) create mode 100644 services/mana-crawler/.env.example create mode 100644 services/mana-crawler/.gitignore create mode 100644 services/mana-crawler/CLAUDE.md create mode 100644 services/mana-crawler/Dockerfile create mode 100644 services/mana-crawler/drizzle.config.ts create mode 100644 services/mana-crawler/nest-cli.json create mode 100644 services/mana-crawler/package.json create mode 100644 services/mana-crawler/src/app.module.ts create mode 100644 services/mana-crawler/src/cache/cache.module.ts create mode 100644 services/mana-crawler/src/cache/cache.service.ts create mode 100644 services/mana-crawler/src/common/filters/http-exception.filter.ts create mode 100644 services/mana-crawler/src/config/configuration.ts create mode 100644 services/mana-crawler/src/crawler/crawler.controller.ts create mode 100644 services/mana-crawler/src/crawler/crawler.module.ts create mode 100644 services/mana-crawler/src/crawler/crawler.service.ts create mode 100644 services/mana-crawler/src/crawler/dto/crawl-response.dto.ts create mode 100644 services/mana-crawler/src/crawler/dto/index.ts create mode 100644 services/mana-crawler/src/crawler/dto/start-crawl.dto.ts create mode 100644 services/mana-crawler/src/db/connection.ts create mode 100644 services/mana-crawler/src/db/database.module.ts create mode 100644 services/mana-crawler/src/db/schema/crawl-jobs.schema.ts create mode 100644 services/mana-crawler/src/db/schema/crawl-results.schema.ts create mode 100644 services/mana-crawler/src/db/schema/index.ts create mode 100644 services/mana-crawler/src/health/health.controller.ts create mode 100644 services/mana-crawler/src/health/health.module.ts create mode 100644 services/mana-crawler/src/main.ts create mode 100644 services/mana-crawler/src/metrics/metrics.controller.ts create mode 100644 services/mana-crawler/src/metrics/metrics.module.ts create mode 100644 services/mana-crawler/src/metrics/metrics.service.ts create mode 100644 services/mana-crawler/src/parser/parser.module.ts create mode 100644 services/mana-crawler/src/parser/parser.service.ts create mode 100644 services/mana-crawler/src/queue/constants.ts create mode 100644 services/mana-crawler/src/queue/processor.module.ts create mode 100644 services/mana-crawler/src/queue/processors/crawl.processor.ts create mode 100644 services/mana-crawler/src/queue/queue.module.ts create mode 100644 services/mana-crawler/src/queue/queue.service.ts create mode 100644 services/mana-crawler/src/robots/robots.module.ts create mode 100644 services/mana-crawler/src/robots/robots.service.ts create mode 100644 services/mana-crawler/tsconfig.json diff --git a/services/mana-crawler/.env.example b/services/mana-crawler/.env.example new file mode 100644 index 000000000..04458b8a0 --- /dev/null +++ b/services/mana-crawler/.env.example @@ -0,0 +1,24 @@ +# Server +PORT=3023 +NODE_ENV=development + +# Database +DATABASE_URL=postgresql://manacore:devpassword@localhost:5432/manacore + +# Redis (Queue) +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= + +# Crawling +CRAWLER_USER_AGENT=ManaCoreCrawler/1.0 (+https://manacore.io/bot) +CRAWLER_DEFAULT_RATE_LIMIT=2 +CRAWLER_DEFAULT_MAX_DEPTH=3 +CRAWLER_DEFAULT_MAX_PAGES=100 +CRAWLER_TIMEOUT=30000 + +# External Services (optional - for single-page extraction fallback) +MANA_SEARCH_URL=http://localhost:3021 + +# CORS +CORS_ORIGINS=http://localhost:3000,http://localhost:5173,http://localhost:8081 diff --git a/services/mana-crawler/.gitignore b/services/mana-crawler/.gitignore new file mode 100644 index 000000000..33af69cc1 --- /dev/null +++ b/services/mana-crawler/.gitignore @@ -0,0 +1,20 @@ +# Dependencies +node_modules + +# Build +dist + +# Environment +.env +.env.local + +# IDE +.idea +.vscode + +# Debug +*.log +npm-debug.log* + +# Test +coverage diff --git a/services/mana-crawler/CLAUDE.md b/services/mana-crawler/CLAUDE.md new file mode 100644 index 000000000..1a89999d0 --- /dev/null +++ b/services/mana-crawler/CLAUDE.md @@ -0,0 +1,297 @@ +# Mana Crawler Service + +Web crawler microservice for systematic website crawling and content extraction. + +## Overview + +- **Port**: 3023 +- **Technology**: NestJS + BullMQ + Cheerio + PostgreSQL + Redis +- **Purpose**: Crawl websites, extract structured content, and queue-based processing + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ mana-crawler (Port 3023) │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Crawl API │ │ Queue │ │ Parser │ │ +│ │ Controller │──│ Service │──│ Service │ │ +│ └─────────────┘ │ (BullMQ) │ │ (Cheerio) │ │ +│ └─────────────┘ └─────────────┘ │ +│ │ │ │ +│ ┌─────┴────────────────┴─────┐ │ +│ │ Storage Service │ │ +│ │ (PostgreSQL + Redis) │ │ +│ └─────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Quick Start + +### Development + +```bash +# 1. Start Redis and PostgreSQL (from monorepo root) +pnpm docker:up + +# 2. Install dependencies +pnpm install + +# 3. Push database schema +pnpm db:push + +# 4. Start in development mode +pnpm dev +``` + +### Production + +```bash +pnpm build +pnpm start +``` + +## API Endpoints + +### Crawl Jobs + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/crawl` | Start a new crawl job | +| GET | `/api/v1/crawl/:jobId` | Get job status | +| GET | `/api/v1/crawl/:jobId/results` | Get crawl results (paginated) | +| DELETE | `/api/v1/crawl/:jobId` | Cancel a crawl job | +| POST | `/api/v1/crawl/:jobId/pause` | Pause a running job | +| POST | `/api/v1/crawl/:jobId/resume` | Resume a paused job | + +### Instant Extract + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/extract` | Extract single page (proxy to mana-search) | + +### System + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/health` | Health check | +| GET | `/metrics` | Prometheus metrics | +| GET | `/queue/dashboard` | Bull Board dashboard | + +## Usage Examples + +### Start a Crawl Job + +```bash +curl -X POST http://localhost:3023/api/v1/crawl \ + -H "Content-Type: application/json" \ + -d '{ + "startUrl": "https://docs.example.com", + "config": { + "maxDepth": 3, + "maxPages": 500, + "respectRobots": true, + "rateLimit": 2, + "includePatterns": ["/docs/*"], + "excludePatterns": ["/api/*", "*.pdf"], + "selectors": { + "content": "article.main-content", + "title": "h1.page-title" + }, + "output": { + "format": "markdown", + "includeScreenshots": false + } + } + }' + +# Response: +# { +# "jobId": "uuid", +# "status": "pending", +# "estimatedPages": 500, +# "queuePosition": 3 +# } +``` + +### Check Job Status + +```bash +curl http://localhost:3023/api/v1/crawl/{jobId} + +# Response: +# { +# "jobId": "uuid", +# "status": "running", +# "progress": { +# "discovered": 245, +# "crawled": 127, +# "failed": 3, +# "queued": 115 +# }, +# "startedAt": "2024-01-29T12:00:00Z", +# "averagePageTime": 450 +# } +``` + +### Get Results + +```bash +curl "http://localhost:3023/api/v1/crawl/{jobId}/results?page=1&limit=50" + +# Response: +# { +# "results": [...], +# "pagination": { +# "page": 1, +# "limit": 50, +# "total": 127 +# } +# } +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `PORT` | 3023 | API port | +| `DATABASE_URL` | - | PostgreSQL connection URL | +| `REDIS_HOST` | localhost | Redis host | +| `REDIS_PORT` | 6379 | Redis port | +| `CRAWLER_USER_AGENT` | ManaCoreCrawler/1.0 | Crawler user agent | +| `CRAWLER_DEFAULT_RATE_LIMIT` | 2 | Default requests/second | +| `CRAWLER_DEFAULT_MAX_DEPTH` | 3 | Default max crawl depth | +| `CRAWLER_DEFAULT_MAX_PAGES` | 100 | Default max pages per job | +| `CRAWLER_TIMEOUT` | 30000 | Request timeout (ms) | +| `MANA_SEARCH_URL` | http://localhost:3021 | mana-search URL (for extract fallback) | + +## Development Commands + +```bash +# Install dependencies +pnpm install + +# Start development server +pnpm dev + +# Build for production +pnpm build + +# Start production server +pnpm start + +# Type checking +pnpm type-check + +# Linting +pnpm lint + +# Database commands +pnpm db:push # Push schema to database +pnpm db:generate # Generate migrations +pnpm db:migrate # Run migrations +pnpm db:studio # Open Drizzle Studio +``` + +## Database Schema + +The crawler uses its own schema (`crawler`) in the shared ManaCore database: + +- `crawler.crawl_jobs` - Crawl job configuration and status +- `crawler.crawl_results` - Individual page results + +## Queue System + +Uses BullMQ with Redis for job processing: + +- **Queue Name**: `crawl` +- **Concurrency**: Configurable (default: 5) +- **Retry**: 3 attempts with exponential backoff +- **Dashboard**: Available at `/queue/dashboard` + +## Robots.txt Compliance + +The crawler respects robots.txt by default: +- Checks robots.txt before crawling each domain +- Caches robots.txt rules in Redis (24h TTL) +- Can be disabled per-job with `respectRobots: false` + +## Rate Limiting + +Built-in rate limiting to be a good citizen: +- Per-domain rate limiting +- Configurable delay between requests +- Default: 2 requests/second/domain + +## Project Structure + +``` +services/mana-crawler/ +├── src/ +│ ├── main.ts # Application entry point +│ ├── app.module.ts # Root module +│ ├── config/ +│ │ └── configuration.ts # App configuration +│ ├── db/ +│ │ ├── schema/ # Drizzle schemas +│ │ ├── database.module.ts # Database provider +│ │ └── connection.ts # DB connection +│ ├── crawler/ # Crawl job management +│ │ ├── crawler.controller.ts +│ │ ├── crawler.service.ts +│ │ └── dto/ +│ ├── queue/ # BullMQ queue processing +│ │ ├── queue.module.ts +│ │ └── processors/ +│ ├── parser/ # HTML parsing (Cheerio) +│ ├── robots/ # robots.txt handling +│ ├── cache/ # Redis caching +│ ├── metrics/ # Prometheus metrics +│ └── health/ # Health check +├── drizzle.config.ts +├── package.json +├── tsconfig.json +└── Dockerfile +``` + +## Integration with Other Services + +### mana-search +The crawler can use mana-search for single-page extraction as a fallback: +```typescript +POST http://mana-search:3021/api/v1/extract +``` + +### mana-api-gateway +The crawler can be exposed via the API gateway for monetization: +``` +POST /v1/crawler/start → 5 Credits/Job + 1 Credit/100 pages +GET /v1/crawler/:id → 0 Credits +``` + +## Troubleshooting + +### Redis connection issues + +```bash +# Check Redis +docker exec mana-redis redis-cli ping + +# Check queue status +curl http://localhost:3023/queue/dashboard +``` + +### Jobs stuck in pending + +Check that: +1. Redis is running +2. The queue processor is active +3. No rate limit issues + +### High memory usage + +The crawler loads pages into memory for parsing. For large crawls: +- Reduce `maxPages` per job +- Increase job concurrency instead +- Monitor with `/metrics` diff --git a/services/mana-crawler/Dockerfile b/services/mana-crawler/Dockerfile new file mode 100644 index 000000000..b275dcdeb --- /dev/null +++ b/services/mana-crawler/Dockerfile @@ -0,0 +1,55 @@ +# Build stage +FROM node:20-alpine AS builder + +RUN corepack enable && corepack prepare pnpm@9.15.0 --activate + +WORKDIR /app + +# Copy workspace files +COPY pnpm-workspace.yaml ./ +COPY pnpm-lock.yaml ./ +COPY package.json ./ + +# Copy service files +COPY services/mana-crawler/package.json ./services/mana-crawler/ + +# Copy shared packages +COPY packages/shared-drizzle-config/package.json ./packages/shared-drizzle-config/ + +# Install dependencies +RUN pnpm install --frozen-lockfile + +# Copy source code +COPY services/mana-crawler ./services/mana-crawler +COPY packages/shared-drizzle-config ./packages/shared-drizzle-config + +# Build +WORKDIR /app/services/mana-crawler +RUN pnpm build + +# Production stage +FROM node:20-alpine AS runner + +RUN corepack enable && corepack prepare pnpm@9.15.0 --activate + +WORKDIR /app + +# Copy package files +COPY --from=builder /app/pnpm-workspace.yaml ./ +COPY --from=builder /app/pnpm-lock.yaml ./ +COPY --from=builder /app/package.json ./ +COPY --from=builder /app/services/mana-crawler/package.json ./services/mana-crawler/ +COPY --from=builder /app/packages/shared-drizzle-config/package.json ./packages/shared-drizzle-config/ + +# Install production dependencies only +RUN pnpm install --frozen-lockfile --prod + +# Copy built files +COPY --from=builder /app/services/mana-crawler/dist ./services/mana-crawler/dist +COPY --from=builder /app/packages/shared-drizzle-config/dist ./packages/shared-drizzle-config/dist + +WORKDIR /app/services/mana-crawler + +EXPOSE 3023 + +CMD ["node", "dist/main"] diff --git a/services/mana-crawler/drizzle.config.ts b/services/mana-crawler/drizzle.config.ts new file mode 100644 index 000000000..5741a38eb --- /dev/null +++ b/services/mana-crawler/drizzle.config.ts @@ -0,0 +1,6 @@ +import { createDrizzleConfig } from '@manacore/shared-drizzle-config'; + +export default createDrizzleConfig({ + dbName: 'manacore', + schemaFilter: ['crawler'], +}); diff --git a/services/mana-crawler/nest-cli.json b/services/mana-crawler/nest-cli.json new file mode 100644 index 000000000..95538fb90 --- /dev/null +++ b/services/mana-crawler/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/services/mana-crawler/package.json b/services/mana-crawler/package.json new file mode 100644 index 000000000..f3b2491b1 --- /dev/null +++ b/services/mana-crawler/package.json @@ -0,0 +1,61 @@ +{ + "name": "@manacore/mana-crawler", + "version": "1.0.0", + "description": "Web crawler microservice for systematic website crawling and content extraction", + "private": true, + "license": "UNLICENSED", + "scripts": { + "build": "nest build", + "dev": "nest start --watch", + "start": "node dist/main", + "start:dev": "nest start --watch", + "start:debug": "nest start --debug --watch", + "start:prod": "node dist/main", + "lint": "eslint \"{src,test}/**/*.ts\" --fix", + "type-check": "tsc --noEmit", + "test": "jest", + "test:watch": "jest --watch", + "test:cov": "jest --coverage", + "db:push": "drizzle-kit push", + "db:generate": "drizzle-kit generate", + "db:migrate": "drizzle-kit migrate", + "db:studio": "drizzle-kit studio" + }, + "dependencies": { + "@bull-board/api": "^6.6.0", + "@bull-board/express": "^6.6.0", + "@bull-board/nestjs": "^6.6.0", + "@nestjs/bullmq": "^10.2.3", + "@nestjs/common": "^10.4.17", + "@nestjs/config": "^3.3.0", + "@nestjs/core": "^10.4.17", + "@nestjs/platform-express": "^10.4.17", + "bullmq": "^5.34.8", + "cheerio": "^1.0.0", + "class-transformer": "^0.5.1", + "class-validator": "^0.14.1", + "drizzle-orm": "^0.38.4", + "ioredis": "^5.4.2", + "postgres": "^3.4.5", + "prom-client": "^15.1.3", + "reflect-metadata": "^0.2.2", + "robots-parser": "^3.0.1", + "rxjs": "^7.8.1", + "turndown": "^7.2.0" + }, + "devDependencies": { + "@manacore/shared-drizzle-config": "workspace:*", + "@nestjs/cli": "^10.4.9", + "@nestjs/schematics": "^10.2.3", + "@nestjs/testing": "^10.4.17", + "@types/express": "^5.0.0", + "@types/jest": "^29.5.14", + "@types/node": "^22.10.5", + "@types/turndown": "^5.0.5", + "drizzle-kit": "^0.30.4", + "jest": "^29.7.0", + "ts-jest": "^29.2.5", + "ts-node": "^10.9.2", + "typescript": "^5.7.2" + } +} diff --git a/services/mana-crawler/src/app.module.ts b/services/mana-crawler/src/app.module.ts new file mode 100644 index 000000000..4ae3f1944 --- /dev/null +++ b/services/mana-crawler/src/app.module.ts @@ -0,0 +1,47 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { BullModule } from '@nestjs/bullmq'; +import { BullBoardModule } from '@bull-board/nestjs'; +import { ExpressAdapter } from '@bull-board/express'; +import configuration from './config/configuration'; +import { DatabaseModule } from './db/database.module'; +import { HealthModule } from './health/health.module'; +import { MetricsModule } from './metrics/metrics.module'; +import { CacheModule } from './cache/cache.module'; +import { CrawlerModule } from './crawler/crawler.module'; +import { QueueModule } from './queue/queue.module'; +import { ProcessorModule } from './queue/processor.module'; +import { ParserModule } from './parser/parser.module'; +import { RobotsModule } from './robots/robots.module'; + +@Module({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + load: [configuration], + }), + BullModule.forRootAsync({ + useFactory: () => ({ + connection: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379', 10), + password: process.env.REDIS_PASSWORD || undefined, + }, + }), + }), + BullBoardModule.forRoot({ + route: '/queue/dashboard', + adapter: ExpressAdapter, + }), + DatabaseModule, + HealthModule, + MetricsModule, + CacheModule, + RobotsModule, + ParserModule, + QueueModule, + ProcessorModule, + CrawlerModule, + ], +}) +export class AppModule {} diff --git a/services/mana-crawler/src/cache/cache.module.ts b/services/mana-crawler/src/cache/cache.module.ts new file mode 100644 index 000000000..618591395 --- /dev/null +++ b/services/mana-crawler/src/cache/cache.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { CacheService } from './cache.service'; +import { MetricsModule } from '../metrics/metrics.module'; + +@Module({ + imports: [MetricsModule], + providers: [CacheService], + exports: [CacheService], +}) +export class CacheModule {} diff --git a/services/mana-crawler/src/cache/cache.service.ts b/services/mana-crawler/src/cache/cache.service.ts new file mode 100644 index 000000000..2e4e60ef7 --- /dev/null +++ b/services/mana-crawler/src/cache/cache.service.ts @@ -0,0 +1,152 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import { MetricsService } from '../metrics/metrics.service'; + +@Injectable() +export class CacheService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(CacheService.name); + private client: Redis | null = null; + private readonly keyPrefix: string; + + private stats = { + hits: 0, + misses: 0, + }; + + constructor( + private readonly configService: ConfigService, + private readonly metricsService: MetricsService, + ) { + this.keyPrefix = this.configService.get('redis.keyPrefix', 'mana-crawler:'); + } + + async onModuleInit() { + const host = this.configService.get('redis.host', 'localhost'); + const port = this.configService.get('redis.port', 6379); + const password = this.configService.get('redis.password'); + + try { + this.client = new Redis({ + host, + port, + password, + retryStrategy: (times) => { + if (times > 3) { + this.logger.warn('Redis connection failed, running without cache'); + return null; + } + return Math.min(times * 200, 2000); + }, + maxRetriesPerRequest: 1, + }); + + this.client.on('error', (err) => { + this.logger.error(`Redis error: ${err.message}`); + }); + + this.client.on('connect', () => { + this.logger.log(`Connected to Redis at ${host}:${port}`); + }); + + await this.client.ping(); + } catch (error) { + this.logger.warn(`Could not connect to Redis: ${error}. Running without cache.`); + this.client = null; + } + } + + async onModuleDestroy() { + if (this.client) { + await this.client.quit(); + } + } + + private buildKey(key: string): string { + return `${this.keyPrefix}${key}`; + } + + async get(key: string): Promise { + if (!this.client) return null; + + try { + const data = await this.client.get(this.buildKey(key)); + if (data) { + this.stats.hits++; + this.metricsService.recordCacheHit(); + return JSON.parse(data); + } + this.stats.misses++; + this.metricsService.recordCacheMiss(); + return null; + } catch (error) { + this.logger.error(`Cache get error: ${error}`); + return null; + } + } + + async set(key: string, value: unknown, ttlSeconds: number): Promise { + if (!this.client) return; + + try { + await this.client.setex(this.buildKey(key), ttlSeconds, JSON.stringify(value)); + } catch (error) { + this.logger.error(`Cache set error: ${error}`); + } + } + + async delete(key: string): Promise { + if (!this.client) return; + + try { + await this.client.del(this.buildKey(key)); + } catch (error) { + this.logger.error(`Cache delete error: ${error}`); + } + } + + async clear(pattern?: string): Promise { + if (!this.client) return 0; + + try { + const searchPattern = pattern + ? `${this.keyPrefix}${pattern}` + : `${this.keyPrefix}*`; + const keys = await this.client.keys(searchPattern); + if (keys.length > 0) { + await this.client.del(...keys); + } + return keys.length; + } catch (error) { + this.logger.error(`Cache clear error: ${error}`); + return 0; + } + } + + getStats() { + const total = this.stats.hits + this.stats.misses; + return { + hits: this.stats.hits, + misses: this.stats.misses, + hitRate: total > 0 ? this.stats.hits / total : 0, + }; + } + + async healthCheck(): Promise<{ status: string; latency: number }> { + if (!this.client) { + return { status: 'disabled', latency: 0 }; + } + + const start = Date.now(); + try { + await this.client.ping(); + return { status: 'ok', latency: Date.now() - start }; + } catch { + return { status: 'error', latency: Date.now() - start }; + } + } + + isConnected(): boolean { + return this.client !== null && this.client.status === 'ready'; + } +} diff --git a/services/mana-crawler/src/common/filters/http-exception.filter.ts b/services/mana-crawler/src/common/filters/http-exception.filter.ts new file mode 100644 index 000000000..b86fb7885 --- /dev/null +++ b/services/mana-crawler/src/common/filters/http-exception.filter.ts @@ -0,0 +1,51 @@ +import { + ExceptionFilter, + Catch, + ArgumentsHost, + HttpException, + HttpStatus, + Logger, +} from '@nestjs/common'; +import { Response } from 'express'; + +@Catch() +export class HttpExceptionFilter implements ExceptionFilter { + private readonly logger = new Logger(HttpExceptionFilter.name); + + catch(exception: unknown, host: ArgumentsHost) { + const ctx = host.switchToHttp(); + const response = ctx.getResponse(); + + let status: number; + let message: string; + let error: string; + + if (exception instanceof HttpException) { + status = exception.getStatus(); + const exceptionResponse = exception.getResponse(); + message = + typeof exceptionResponse === 'string' + ? exceptionResponse + : (exceptionResponse as { message?: string }).message || + exception.message; + error = exception.name; + } else if (exception instanceof Error) { + status = HttpStatus.INTERNAL_SERVER_ERROR; + message = exception.message; + error = 'Internal Server Error'; + this.logger.error(`Unhandled exception: ${exception.message}`, exception.stack); + } else { + status = HttpStatus.INTERNAL_SERVER_ERROR; + message = 'An unexpected error occurred'; + error = 'Internal Server Error'; + this.logger.error(`Unknown exception type: ${exception}`); + } + + response.status(status).json({ + statusCode: status, + error, + message, + timestamp: new Date().toISOString(), + }); + } +} diff --git a/services/mana-crawler/src/config/configuration.ts b/services/mana-crawler/src/config/configuration.ts new file mode 100644 index 000000000..76c7957c1 --- /dev/null +++ b/services/mana-crawler/src/config/configuration.ts @@ -0,0 +1,49 @@ +export default () => ({ + port: parseInt(process.env.PORT || '3023', 10), + nodeEnv: process.env.NODE_ENV || 'development', + + cors: { + origins: process.env.CORS_ORIGINS?.split(',') || [ + 'http://localhost:3000', + 'http://localhost:5173', + 'http://localhost:8081', + ], + }, + + database: { + url: + process.env.DATABASE_URL || + 'postgresql://manacore:devpassword@localhost:5432/manacore', + }, + + redis: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379', 10), + password: process.env.REDIS_PASSWORD, + keyPrefix: 'mana-crawler:', + }, + + crawler: { + userAgent: + process.env.CRAWLER_USER_AGENT || + 'ManaCoreCrawler/1.0 (+https://manacore.io/bot)', + defaultRateLimit: parseFloat(process.env.CRAWLER_DEFAULT_RATE_LIMIT || '2'), + defaultMaxDepth: parseInt(process.env.CRAWLER_DEFAULT_MAX_DEPTH || '3', 10), + defaultMaxPages: parseInt(process.env.CRAWLER_DEFAULT_MAX_PAGES || '100', 10), + timeout: parseInt(process.env.CRAWLER_TIMEOUT || '30000', 10), + }, + + queue: { + concurrency: parseInt(process.env.QUEUE_CONCURRENCY || '5', 10), + maxRetries: parseInt(process.env.QUEUE_MAX_RETRIES || '3', 10), + }, + + cache: { + robotsTtl: parseInt(process.env.CACHE_ROBOTS_TTL || '86400', 10), // 24 hours + resultsTtl: parseInt(process.env.CACHE_RESULTS_TTL || '3600', 10), // 1 hour + }, + + externalServices: { + manaSearchUrl: process.env.MANA_SEARCH_URL || 'http://localhost:3021', + }, +}); diff --git a/services/mana-crawler/src/crawler/crawler.controller.ts b/services/mana-crawler/src/crawler/crawler.controller.ts new file mode 100644 index 000000000..454845146 --- /dev/null +++ b/services/mana-crawler/src/crawler/crawler.controller.ts @@ -0,0 +1,74 @@ +import { + Controller, + Post, + Get, + Delete, + Body, + Param, + Query, + ParseUUIDPipe, + ParseIntPipe, + DefaultValuePipe, + HttpCode, + HttpStatus, +} from '@nestjs/common'; +import { CrawlerService } from './crawler.service'; +import { StartCrawlDto } from './dto/start-crawl.dto'; +import { CrawlJobResponse, CrawlResultResponse, PaginatedResults } from './dto/crawl-response.dto'; + +@Controller('crawl') +export class CrawlerController { + constructor(private readonly crawlerService: CrawlerService) {} + + @Post() + async startCrawl(@Body() dto: StartCrawlDto): Promise { + return this.crawlerService.startCrawl(dto); + } + + @Get() + async listJobs( + @Query('page', new DefaultValuePipe(1), ParseIntPipe) page: number, + @Query('limit', new DefaultValuePipe(20), ParseIntPipe) limit: number, + @Query('status') status?: string, + ): Promise> { + return this.crawlerService.listJobs(page, limit, status); + } + + @Get(':jobId') + async getJob( + @Param('jobId', ParseUUIDPipe) jobId: string, + ): Promise { + return this.crawlerService.getJob(jobId); + } + + @Get(':jobId/results') + async getJobResults( + @Param('jobId', ParseUUIDPipe) jobId: string, + @Query('page', new DefaultValuePipe(1), ParseIntPipe) page: number, + @Query('limit', new DefaultValuePipe(50), ParseIntPipe) limit: number, + ): Promise> { + return this.crawlerService.getJobResults(jobId, page, limit); + } + + @Delete(':jobId') + @HttpCode(HttpStatus.NO_CONTENT) + async cancelJob( + @Param('jobId', ParseUUIDPipe) jobId: string, + ): Promise { + return this.crawlerService.cancelJob(jobId); + } + + @Post(':jobId/pause') + async pauseJob( + @Param('jobId', ParseUUIDPipe) jobId: string, + ): Promise { + return this.crawlerService.pauseJob(jobId); + } + + @Post(':jobId/resume') + async resumeJob( + @Param('jobId', ParseUUIDPipe) jobId: string, + ): Promise { + return this.crawlerService.resumeJob(jobId); + } +} diff --git a/services/mana-crawler/src/crawler/crawler.module.ts b/services/mana-crawler/src/crawler/crawler.module.ts new file mode 100644 index 000000000..7fc3eb44a --- /dev/null +++ b/services/mana-crawler/src/crawler/crawler.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { CrawlerController } from './crawler.controller'; +import { CrawlerService } from './crawler.service'; +import { QueueModule } from '../queue/queue.module'; +import { MetricsModule } from '../metrics/metrics.module'; + +@Module({ + imports: [QueueModule, MetricsModule], + controllers: [CrawlerController], + providers: [CrawlerService], + exports: [CrawlerService], +}) +export class CrawlerModule {} diff --git a/services/mana-crawler/src/crawler/crawler.service.ts b/services/mana-crawler/src/crawler/crawler.service.ts new file mode 100644 index 000000000..b03adc9c3 --- /dev/null +++ b/services/mana-crawler/src/crawler/crawler.service.ts @@ -0,0 +1,337 @@ +import { Injectable, Logger, Inject, NotFoundException, BadRequestException } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { eq, desc, count } from 'drizzle-orm'; +import { DATABASE_CONNECTION } from '../db/database.module'; +import { crawlJobs, crawlResults, type NewCrawlJob, type CrawlJob, type CrawlResult } from '../db/schema'; +import { QueueService } from '../queue/queue.service'; +import { MetricsService } from '../metrics/metrics.service'; +import { StartCrawlDto } from './dto/start-crawl.dto'; +import { + CrawlJobResponse, + CrawlResultResponse, + PaginatedResults, + toCrawlJobResponse, + toCrawlResultResponse, +} from './dto/crawl-response.dto'; + +@Injectable() +export class CrawlerService { + private readonly logger = new Logger(CrawlerService.name); + private readonly defaultMaxDepth: number; + private readonly defaultMaxPages: number; + private readonly defaultRateLimit: number; + + constructor( + private readonly configService: ConfigService, + private readonly queueService: QueueService, + private readonly metricsService: MetricsService, + @Inject(DATABASE_CONNECTION) private readonly db: any, + ) { + this.defaultMaxDepth = this.configService.get('crawler.defaultMaxDepth', 3); + this.defaultMaxPages = this.configService.get('crawler.defaultMaxPages', 100); + this.defaultRateLimit = this.configService.get('crawler.defaultRateLimit', 2); + } + + async startCrawl(dto: StartCrawlDto, userId?: string, apiKeyId?: string): Promise { + const startUrl = new URL(dto.startUrl); + const domain = startUrl.hostname; + + const config = dto.config || {}; + + const newJob: NewCrawlJob = { + startUrl: dto.startUrl, + domain, + maxDepth: config.maxDepth ?? this.defaultMaxDepth, + maxPages: config.maxPages ?? this.defaultMaxPages, + rateLimit: config.rateLimit ?? this.defaultRateLimit, + respectRobots: config.respectRobots ?? true, + includePatterns: config.includePatterns, + excludePatterns: config.excludePatterns, + selectors: config.selectors, + output: config.output, + webhookUrl: dto.webhookUrl, + userId, + apiKeyId, + status: 'pending', + progress: { + discovered: 0, + crawled: 0, + failed: 0, + queued: 0, + }, + }; + + // Insert job into database + const [createdJob] = await this.db + .insert(crawlJobs) + .values(newJob) + .returning(); + + this.logger.log(`Created crawl job ${createdJob.id} for ${domain}`); + + // Add to queue + try { + await this.queueService.addCrawlJob(createdJob); + + // Update status to running + const [updatedJob] = await this.db + .update(crawlJobs) + .set({ + status: 'running', + startedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, createdJob.id)) + .returning(); + + this.metricsService.setActiveJobs('running', 1); + + return toCrawlJobResponse(updatedJob); + } catch (error) { + // Update status to failed + await this.db + .update(crawlJobs) + .set({ + status: 'failed', + error: error instanceof Error ? error.message : 'Failed to queue job', + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, createdJob.id)); + + throw error; + } + } + + async getJob(jobId: string): Promise { + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + return toCrawlJobResponse(job); + } + + async getJobResults( + jobId: string, + page = 1, + limit = 50, + ): Promise> { + // Verify job exists + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + // Get total count + const [{ total }] = await this.db + .select({ total: count() }) + .from(crawlResults) + .where(eq(crawlResults.jobId, jobId)); + + // Get paginated results + const offset = (page - 1) * limit; + const results = await this.db + .select() + .from(crawlResults) + .where(eq(crawlResults.jobId, jobId)) + .orderBy(desc(crawlResults.createdAt)) + .limit(limit) + .offset(offset); + + return { + results: results.map(toCrawlResultResponse), + pagination: { + page, + limit, + total: Number(total), + totalPages: Math.ceil(Number(total) / limit), + }, + }; + } + + async cancelJob(jobId: string): Promise { + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + if (['completed', 'failed', 'cancelled'].includes(job.status)) { + throw new BadRequestException(`Cannot cancel job with status: ${job.status}`); + } + + // Cancel queue jobs + await this.queueService.cancelJob(jobId); + + // Update status + const [updatedJob] = await this.db + .update(crawlJobs) + .set({ + status: 'cancelled', + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, jobId)) + .returning(); + + this.logger.log(`Cancelled crawl job ${jobId}`); + + return toCrawlJobResponse(updatedJob); + } + + async pauseJob(jobId: string): Promise { + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + if (job.status !== 'running') { + throw new BadRequestException(`Cannot pause job with status: ${job.status}`); + } + + // Pause queue jobs + await this.queueService.pauseJob(jobId); + + // Update status + const [updatedJob] = await this.db + .update(crawlJobs) + .set({ + status: 'paused', + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, jobId)) + .returning(); + + this.logger.log(`Paused crawl job ${jobId}`); + + return toCrawlJobResponse(updatedJob); + } + + async resumeJob(jobId: string): Promise { + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + if (job.status !== 'paused') { + throw new BadRequestException(`Cannot resume job with status: ${job.status}`); + } + + // Re-add to queue + await this.queueService.addCrawlJob(job); + + // Update status + const [updatedJob] = await this.db + .update(crawlJobs) + .set({ + status: 'running', + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, jobId)) + .returning(); + + this.logger.log(`Resumed crawl job ${jobId}`); + + return toCrawlJobResponse(updatedJob); + } + + async listJobs( + page = 1, + limit = 20, + status?: string, + userId?: string, + ): Promise> { + let query = this.db.select().from(crawlJobs); + + // Build conditions + const conditions = []; + if (status) { + conditions.push(eq(crawlJobs.status, status)); + } + if (userId) { + conditions.push(eq(crawlJobs.userId, userId)); + } + + if (conditions.length > 0) { + query = query.where(conditions[0]); + for (let i = 1; i < conditions.length; i++) { + query = query.where(conditions[i]); + } + } + + // Get total count + let countQuery = this.db.select({ total: count() }).from(crawlJobs); + if (conditions.length > 0) { + countQuery = countQuery.where(conditions[0]); + for (let i = 1; i < conditions.length; i++) { + countQuery = countQuery.where(conditions[i]); + } + } + const [{ total }] = await countQuery; + + // Get paginated results + const offset = (page - 1) * limit; + const jobs = await query + .orderBy(desc(crawlJobs.createdAt)) + .limit(limit) + .offset(offset); + + return { + results: jobs.map(toCrawlJobResponse), + pagination: { + page, + limit, + total: Number(total), + totalPages: Math.ceil(Number(total) / limit), + }, + }; + } + + async deleteJob(jobId: string): Promise { + const [job] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!job) { + throw new NotFoundException(`Crawl job ${jobId} not found`); + } + + // Cancel if running + if (['running', 'pending', 'paused'].includes(job.status)) { + await this.queueService.cancelJob(jobId); + } + + // Delete job (results will be cascade deleted) + await this.db + .delete(crawlJobs) + .where(eq(crawlJobs.id, jobId)); + + this.logger.log(`Deleted crawl job ${jobId}`); + } +} diff --git a/services/mana-crawler/src/crawler/dto/crawl-response.dto.ts b/services/mana-crawler/src/crawler/dto/crawl-response.dto.ts new file mode 100644 index 000000000..7f5be7f9b --- /dev/null +++ b/services/mana-crawler/src/crawler/dto/crawl-response.dto.ts @@ -0,0 +1,91 @@ +import { CrawlJob, CrawlResult, CrawlProgress } from '../../db/schema'; + +export interface CrawlJobResponse { + jobId: string; + status: string; + startUrl: string; + domain: string; + config: { + maxDepth: number; + maxPages: number; + rateLimit: number; + respectRobots: boolean; + includePatterns?: string[]; + excludePatterns?: string[]; + }; + progress: CrawlProgress; + startedAt?: string; + completedAt?: string; + createdAt: string; + error?: string; +} + +export interface CrawlResultResponse { + id: string; + url: string; + parentUrl?: string; + depth: number; + title?: string; + content?: string; + markdown?: string; + links?: string[]; + metadata?: Record; + statusCode?: number; + error?: string; + fetchDurationMs?: number; + parseDurationMs?: number; + contentLength?: number; + createdAt: string; +} + +export interface PaginatedResults { + results: T[]; + pagination: { + page: number; + limit: number; + total: number; + totalPages: number; + }; +} + +export function toCrawlJobResponse(job: CrawlJob): CrawlJobResponse { + return { + jobId: job.id, + status: job.status, + startUrl: job.startUrl, + domain: job.domain, + config: { + maxDepth: job.maxDepth, + maxPages: job.maxPages, + rateLimit: job.rateLimit, + respectRobots: job.respectRobots, + includePatterns: job.includePatterns ?? undefined, + excludePatterns: job.excludePatterns ?? undefined, + }, + progress: job.progress || { discovered: 0, crawled: 0, failed: 0, queued: 0 }, + startedAt: job.startedAt?.toISOString(), + completedAt: job.completedAt?.toISOString(), + createdAt: job.createdAt.toISOString(), + error: job.error ?? undefined, + }; +} + +export function toCrawlResultResponse(result: CrawlResult): CrawlResultResponse { + return { + id: result.id, + url: result.url, + parentUrl: result.parentUrl ?? undefined, + depth: result.depth, + title: result.title ?? undefined, + content: result.content ?? undefined, + markdown: result.markdown ?? undefined, + links: result.links ?? undefined, + metadata: result.metadata ?? undefined, + statusCode: result.statusCode ?? undefined, + error: result.error ?? undefined, + fetchDurationMs: result.fetchDurationMs ?? undefined, + parseDurationMs: result.parseDurationMs ?? undefined, + contentLength: result.contentLength ?? undefined, + createdAt: result.createdAt.toISOString(), + }; +} diff --git a/services/mana-crawler/src/crawler/dto/index.ts b/services/mana-crawler/src/crawler/dto/index.ts new file mode 100644 index 000000000..1c80e3659 --- /dev/null +++ b/services/mana-crawler/src/crawler/dto/index.ts @@ -0,0 +1,2 @@ +export * from './start-crawl.dto'; +export * from './crawl-response.dto'; diff --git a/services/mana-crawler/src/crawler/dto/start-crawl.dto.ts b/services/mana-crawler/src/crawler/dto/start-crawl.dto.ts new file mode 100644 index 000000000..2b2b9bbe2 --- /dev/null +++ b/services/mana-crawler/src/crawler/dto/start-crawl.dto.ts @@ -0,0 +1,96 @@ +import { + IsString, + IsUrl, + IsOptional, + IsInt, + IsBoolean, + IsArray, + Min, + Max, + ValidateNested, + IsObject, + IsEnum, +} from 'class-validator'; +import { Type } from 'class-transformer'; + +export class CrawlSelectorsDto { + @IsOptional() + @IsString() + title?: string; + + @IsOptional() + @IsString() + content?: string; + + @IsOptional() + @IsString() + links?: string; + + @IsOptional() + @IsObject() + custom?: Record; +} + +export class CrawlOutputDto { + @IsOptional() + @IsEnum(['text', 'html', 'markdown']) + format?: 'text' | 'html' | 'markdown'; +} + +export class CrawlConfigDto { + @IsOptional() + @IsInt() + @Min(1) + @Max(10) + maxDepth?: number; + + @IsOptional() + @IsInt() + @Min(1) + @Max(10000) + maxPages?: number; + + @IsOptional() + @IsBoolean() + respectRobots?: boolean; + + @IsOptional() + @IsInt() + @Min(1) + @Max(100) + rateLimit?: number; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + includePatterns?: string[]; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + excludePatterns?: string[]; + + @IsOptional() + @ValidateNested() + @Type(() => CrawlSelectorsDto) + selectors?: CrawlSelectorsDto; + + @IsOptional() + @ValidateNested() + @Type(() => CrawlOutputDto) + output?: CrawlOutputDto; +} + +export class StartCrawlDto { + @IsUrl() + startUrl: string; + + @IsOptional() + @ValidateNested() + @Type(() => CrawlConfigDto) + config?: CrawlConfigDto; + + @IsOptional() + @IsUrl() + webhookUrl?: string; +} diff --git a/services/mana-crawler/src/db/connection.ts b/services/mana-crawler/src/db/connection.ts new file mode 100644 index 000000000..e84b0fa08 --- /dev/null +++ b/services/mana-crawler/src/db/connection.ts @@ -0,0 +1,33 @@ +import { drizzle } from 'drizzle-orm/postgres-js'; +import postgres from 'postgres'; +import * as schema from './schema'; + +let connection: ReturnType | null = null; +let db: ReturnType | null = null; + +export function getConnection(databaseUrl: string) { + if (!connection) { + connection = postgres(databaseUrl, { + max: 10, + idle_timeout: 20, + connect_timeout: 10, + }); + } + return connection; +} + +export function getDb(databaseUrl: string) { + if (!db) { + const conn = getConnection(databaseUrl); + db = drizzle(conn, { schema }); + } + return db; +} + +export async function closeConnection() { + if (connection) { + await connection.end(); + connection = null; + db = null; + } +} diff --git a/services/mana-crawler/src/db/database.module.ts b/services/mana-crawler/src/db/database.module.ts new file mode 100644 index 000000000..ffc4b6b06 --- /dev/null +++ b/services/mana-crawler/src/db/database.module.ts @@ -0,0 +1,24 @@ +import { Global, Module } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { getDb } from './connection'; + +export const DATABASE_CONNECTION = 'DATABASE_CONNECTION'; + +@Global() +@Module({ + providers: [ + { + provide: DATABASE_CONNECTION, + useFactory: (configService: ConfigService) => { + const databaseUrl = configService.get('database.url'); + if (!databaseUrl) { + throw new Error('DATABASE_URL is not configured'); + } + return getDb(databaseUrl); + }, + inject: [ConfigService], + }, + ], + exports: [DATABASE_CONNECTION], +}) +export class DatabaseModule {} diff --git a/services/mana-crawler/src/db/schema/crawl-jobs.schema.ts b/services/mana-crawler/src/db/schema/crawl-jobs.schema.ts new file mode 100644 index 000000000..61fc47e21 --- /dev/null +++ b/services/mana-crawler/src/db/schema/crawl-jobs.schema.ts @@ -0,0 +1,89 @@ +import { + pgSchema, + uuid, + text, + integer, + boolean, + timestamp, + jsonb, + index, +} from 'drizzle-orm/pg-core'; + +export const crawlerSchema = pgSchema('crawler'); + +export interface CrawlSelectors { + title?: string; + content?: string; + links?: string; + custom?: Record; +} + +export interface CrawlProgress { + discovered: number; + crawled: number; + failed: number; + queued: number; +} + +export interface CrawlOutput { + format?: 'text' | 'html' | 'markdown'; +} + +export const crawlJobs = crawlerSchema.table( + 'crawl_jobs', + { + id: uuid('id').defaultRandom().primaryKey(), + + // Job config + startUrl: text('start_url').notNull(), + domain: text('domain').notNull(), + maxDepth: integer('max_depth').notNull().default(3), + maxPages: integer('max_pages').notNull().default(100), + rateLimit: integer('rate_limit').notNull().default(2), // requests/second + + // URL patterns + includePatterns: jsonb('include_patterns').$type(), + excludePatterns: jsonb('exclude_patterns').$type(), + + // Selectors for extraction + selectors: jsonb('selectors').$type(), + + // Output options + output: jsonb('output').$type(), + + // Robots.txt + respectRobots: boolean('respect_robots').notNull().default(true), + + // Status + status: text('status').notNull().default('pending'), // pending, running, paused, completed, failed, cancelled + progress: jsonb('progress').$type().default({ + discovered: 0, + crawled: 0, + failed: 0, + queued: 0, + }), + error: text('error'), + + // Metadata + userId: text('user_id'), + apiKeyId: uuid('api_key_id'), + webhookUrl: text('webhook_url'), + + // Bull queue job ID + bullJobId: text('bull_job_id'), + + // Timestamps + startedAt: timestamp('started_at', { withTimezone: true }), + completedAt: timestamp('completed_at', { withTimezone: true }), + createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(), + updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(), + }, + (table) => ({ + statusIdx: index('crawl_jobs_status_idx').on(table.status), + userIdIdx: index('crawl_jobs_user_id_idx').on(table.userId), + domainIdx: index('crawl_jobs_domain_idx').on(table.domain), + }), +); + +export type CrawlJob = typeof crawlJobs.$inferSelect; +export type NewCrawlJob = typeof crawlJobs.$inferInsert; diff --git a/services/mana-crawler/src/db/schema/crawl-results.schema.ts b/services/mana-crawler/src/db/schema/crawl-results.schema.ts new file mode 100644 index 000000000..f6d966f12 --- /dev/null +++ b/services/mana-crawler/src/db/schema/crawl-results.schema.ts @@ -0,0 +1,53 @@ +import { + uuid, + text, + integer, + timestamp, + jsonb, + index, +} from 'drizzle-orm/pg-core'; +import { crawlerSchema, crawlJobs } from './crawl-jobs.schema'; + +export const crawlResults = crawlerSchema.table( + 'crawl_results', + { + id: uuid('id').defaultRandom().primaryKey(), + jobId: uuid('job_id') + .references(() => crawlJobs.id, { onDelete: 'cascade' }) + .notNull(), + + // Page data + url: text('url').notNull(), + parentUrl: text('parent_url'), + depth: integer('depth').notNull(), + + // Extracted content + title: text('title'), + content: text('content'), + markdown: text('markdown'), + html: text('html'), + metadata: jsonb('metadata').$type>(), + + // Links found + links: jsonb('links').$type(), + + // Status + statusCode: integer('status_code'), + error: text('error'), + + // Performance + fetchDurationMs: integer('fetch_duration_ms'), + parseDurationMs: integer('parse_duration_ms'), + contentLength: integer('content_length'), + + createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(), + }, + (table) => ({ + jobIdIdx: index('crawl_results_job_id_idx').on(table.jobId), + urlIdx: index('crawl_results_url_idx').on(table.url), + jobUrlUnique: index('crawl_results_job_url_idx').on(table.jobId, table.url), + }), +); + +export type CrawlResult = typeof crawlResults.$inferSelect; +export type NewCrawlResult = typeof crawlResults.$inferInsert; diff --git a/services/mana-crawler/src/db/schema/index.ts b/services/mana-crawler/src/db/schema/index.ts new file mode 100644 index 000000000..3ea2611b3 --- /dev/null +++ b/services/mana-crawler/src/db/schema/index.ts @@ -0,0 +1,2 @@ +export * from './crawl-jobs.schema'; +export * from './crawl-results.schema'; diff --git a/services/mana-crawler/src/health/health.controller.ts b/services/mana-crawler/src/health/health.controller.ts new file mode 100644 index 000000000..c762edc96 --- /dev/null +++ b/services/mana-crawler/src/health/health.controller.ts @@ -0,0 +1,48 @@ +import { Controller, Get, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { CacheService } from '../cache/cache.service'; +import { DATABASE_CONNECTION } from '../db/database.module'; +import { sql } from 'drizzle-orm'; + +@Controller() +export class HealthController { + constructor( + private readonly configService: ConfigService, + private readonly cacheService: CacheService, + @Inject(DATABASE_CONNECTION) private readonly db: any, + ) {} + + @Get('/health') + async health() { + // Check Redis + const redisStatus = await this.cacheService.healthCheck(); + + // Check Database + let dbStatus = { status: 'unknown', latency: 0 }; + try { + const start = Date.now(); + await this.db.execute(sql`SELECT 1`); + dbStatus = { status: 'ok', latency: Date.now() - start }; + } catch { + dbStatus = { status: 'error', latency: 0 }; + } + + const overallStatus = + redisStatus.status === 'ok' && dbStatus.status === 'ok' + ? 'ok' + : redisStatus.status === 'error' && dbStatus.status === 'error' + ? 'error' + : 'degraded'; + + return { + status: overallStatus, + service: 'mana-crawler', + version: '1.0.0', + timestamp: new Date().toISOString(), + components: { + redis: redisStatus, + database: dbStatus, + }, + }; + } +} diff --git a/services/mana-crawler/src/health/health.module.ts b/services/mana-crawler/src/health/health.module.ts new file mode 100644 index 000000000..2fd33ce4a --- /dev/null +++ b/services/mana-crawler/src/health/health.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { HealthController } from './health.controller'; +import { CacheModule } from '../cache/cache.module'; + +@Module({ + imports: [CacheModule], + controllers: [HealthController], +}) +export class HealthModule {} diff --git a/services/mana-crawler/src/main.ts b/services/mana-crawler/src/main.ts new file mode 100644 index 000000000..139eb99c0 --- /dev/null +++ b/services/mana-crawler/src/main.ts @@ -0,0 +1,43 @@ +import { NestFactory } from '@nestjs/core'; +import { ValidationPipe, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { AppModule } from './app.module'; +import { HttpExceptionFilter } from './common/filters/http-exception.filter'; + +async function bootstrap() { + const logger = new Logger('Bootstrap'); + + const app = await NestFactory.create(AppModule); + + const configService = app.get(ConfigService); + const port = configService.get('port', 3023); + + // Global prefix + app.setGlobalPrefix('api/v1'); + + // CORS + app.enableCors({ + origin: configService.get('cors.origins', ['http://localhost:*']), + credentials: true, + }); + + // Global pipes + app.useGlobalPipes( + new ValidationPipe({ + whitelist: true, + transform: true, + forbidNonWhitelisted: true, + }), + ); + + // Global filters + app.useGlobalFilters(new HttpExceptionFilter()); + + await app.listen(port); + logger.log(`Mana Crawler Service running on port ${port}`); + logger.log(`Health check: http://localhost:${port}/health`); + logger.log(`Metrics: http://localhost:${port}/metrics`); + logger.log(`Queue Dashboard: http://localhost:${port}/queue/dashboard`); +} + +bootstrap(); diff --git a/services/mana-crawler/src/metrics/metrics.controller.ts b/services/mana-crawler/src/metrics/metrics.controller.ts new file mode 100644 index 000000000..5679d2eab --- /dev/null +++ b/services/mana-crawler/src/metrics/metrics.controller.ts @@ -0,0 +1,13 @@ +import { Controller, Get, Header } from '@nestjs/common'; +import { MetricsService } from './metrics.service'; + +@Controller() +export class MetricsController { + constructor(private readonly metricsService: MetricsService) {} + + @Get('/metrics') + @Header('Content-Type', 'text/plain') + async getMetrics(): Promise { + return this.metricsService.getMetrics(); + } +} diff --git a/services/mana-crawler/src/metrics/metrics.module.ts b/services/mana-crawler/src/metrics/metrics.module.ts new file mode 100644 index 000000000..5c3b5c90f --- /dev/null +++ b/services/mana-crawler/src/metrics/metrics.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { MetricsController } from './metrics.controller'; +import { MetricsService } from './metrics.service'; + +@Module({ + controllers: [MetricsController], + providers: [MetricsService], + exports: [MetricsService], +}) +export class MetricsModule {} diff --git a/services/mana-crawler/src/metrics/metrics.service.ts b/services/mana-crawler/src/metrics/metrics.service.ts new file mode 100644 index 000000000..16e6c20a0 --- /dev/null +++ b/services/mana-crawler/src/metrics/metrics.service.ts @@ -0,0 +1,93 @@ +import { Injectable, OnModuleInit } from '@nestjs/common'; +import * as client from 'prom-client'; + +@Injectable() +export class MetricsService implements OnModuleInit { + private requestCounter: client.Counter; + private requestDuration: client.Histogram; + private cacheHitCounter: client.Counter; + private cacheMissCounter: client.Counter; + private crawlJobsGauge: client.Gauge; + private pagesProcessedCounter: client.Counter; + private crawlErrorsCounter: client.Counter; + + onModuleInit() { + // Clear default metrics and register new ones + client.register.clear(); + client.collectDefaultMetrics({ prefix: 'mana_crawler_' }); + + this.requestCounter = new client.Counter({ + name: 'mana_crawler_requests_total', + help: 'Total number of requests', + labelNames: ['method', 'endpoint', 'status'], + }); + + this.requestDuration = new client.Histogram({ + name: 'mana_crawler_request_duration_ms', + help: 'Request duration in milliseconds', + labelNames: ['method', 'endpoint'], + buckets: [10, 50, 100, 200, 500, 1000, 2000, 5000], + }); + + this.cacheHitCounter = new client.Counter({ + name: 'mana_crawler_cache_hits_total', + help: 'Total number of cache hits', + }); + + this.cacheMissCounter = new client.Counter({ + name: 'mana_crawler_cache_misses_total', + help: 'Total number of cache misses', + }); + + this.crawlJobsGauge = new client.Gauge({ + name: 'mana_crawler_jobs_active', + help: 'Number of active crawl jobs', + labelNames: ['status'], + }); + + this.pagesProcessedCounter = new client.Counter({ + name: 'mana_crawler_pages_processed_total', + help: 'Total number of pages processed', + labelNames: ['status'], + }); + + this.crawlErrorsCounter = new client.Counter({ + name: 'mana_crawler_errors_total', + help: 'Total number of crawl errors', + labelNames: ['type'], + }); + } + + recordRequest(endpoint: string, status: number, durationMs: number, method = 'GET') { + this.requestCounter.inc({ method, endpoint, status: String(status) }); + this.requestDuration.observe({ method, endpoint }, durationMs); + } + + recordCacheHit() { + this.cacheHitCounter.inc(); + } + + recordCacheMiss() { + this.cacheMissCounter.inc(); + } + + setActiveJobs(status: string, count: number) { + this.crawlJobsGauge.set({ status }, count); + } + + recordPageProcessed(status: 'success' | 'error') { + this.pagesProcessedCounter.inc({ status }); + } + + recordCrawlError(type: string) { + this.crawlErrorsCounter.inc({ type }); + } + + async getMetrics(): Promise { + return client.register.metrics(); + } + + getContentType(): string { + return client.register.contentType; + } +} diff --git a/services/mana-crawler/src/parser/parser.module.ts b/services/mana-crawler/src/parser/parser.module.ts new file mode 100644 index 000000000..cbcb7c2c0 --- /dev/null +++ b/services/mana-crawler/src/parser/parser.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { ParserService } from './parser.service'; + +@Module({ + providers: [ParserService], + exports: [ParserService], +}) +export class ParserModule {} diff --git a/services/mana-crawler/src/parser/parser.service.ts b/services/mana-crawler/src/parser/parser.service.ts new file mode 100644 index 000000000..6451d794d --- /dev/null +++ b/services/mana-crawler/src/parser/parser.service.ts @@ -0,0 +1,245 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as cheerio from 'cheerio'; +import TurndownService from 'turndown'; +import { CrawlSelectors } from '../db/schema'; + +export interface ParsedPage { + title: string; + content: string; + markdown?: string; + html?: string; + links: string[]; + metadata: Record; +} + +export interface ParseOptions { + selectors?: CrawlSelectors; + includeMarkdown?: boolean; + includeHtml?: boolean; + baseUrl: string; +} + +@Injectable() +export class ParserService { + private readonly logger = new Logger(ParserService.name); + private readonly turndown: TurndownService; + + constructor(private readonly configService: ConfigService) { + this.turndown = new TurndownService({ + headingStyle: 'atx', + codeBlockStyle: 'fenced', + bulletListMarker: '-', + }); + + // Custom rules for better Markdown output + this.turndown.addRule('codeBlocks', { + filter: ['pre'], + replacement: (content: string) => `\n\`\`\`\n${content}\n\`\`\`\n`, + }); + + this.turndown.addRule('inlineCode', { + filter: ['code'], + replacement: (content: string) => `\`${content}\``, + }); + + // Remove script and style elements + this.turndown.remove(['script', 'style', 'noscript']); + } + + parse(html: string, options: ParseOptions): ParsedPage { + const $ = cheerio.load(html); + const { selectors, includeMarkdown, includeHtml, baseUrl } = options; + + // Remove unwanted elements + $('script, style, noscript, iframe, svg').remove(); + + // Extract title + const title = this.extractTitle($, selectors?.title); + + // Extract main content + const contentHtml = this.extractContent($, selectors?.content); + const content = this.cleanText(contentHtml); + + // Extract links + const links = this.extractLinks($, baseUrl, selectors?.links); + + // Extract metadata + const metadata = this.extractMetadata($); + + // Extract custom selectors + if (selectors?.custom) { + for (const [key, selector] of Object.entries(selectors.custom)) { + try { + metadata[key] = $(selector).text().trim() || $(selector).attr('content'); + } catch { + this.logger.warn(`Failed to extract custom selector: ${key}`); + } + } + } + + const result: ParsedPage = { + title, + content, + links, + metadata, + }; + + if (includeMarkdown && contentHtml) { + result.markdown = this.turndown.turndown(contentHtml); + } + + if (includeHtml) { + result.html = contentHtml; + } + + return result; + } + + private extractTitle($: cheerio.CheerioAPI, selector?: string): string { + if (selector) { + const customTitle = $(selector).text().trim(); + if (customTitle) return customTitle; + } + + // Try common title patterns + const h1 = $('h1').first().text().trim(); + if (h1) return h1; + + const title = $('title').text().trim(); + if (title) return title; + + const ogTitle = $('meta[property="og:title"]').attr('content'); + if (ogTitle) return ogTitle; + + return ''; + } + + private extractContent($: cheerio.CheerioAPI, selector?: string): string { + if (selector) { + const customContent = $(selector).html(); + if (customContent) return customContent; + } + + // Try common content patterns + const contentSelectors = [ + 'article', + 'main', + '[role="main"]', + '.main-content', + '.content', + '.post-content', + '.article-content', + '.entry-content', + '#content', + '#main', + ]; + + for (const sel of contentSelectors) { + const content = $(sel).html(); + if (content && content.length > 100) { + return content; + } + } + + // Fallback to body + return $('body').html() || ''; + } + + private extractLinks( + $: cheerio.CheerioAPI, + baseUrl: string, + selector?: string, + ): string[] { + const links = new Set(); + const baseUrlObj = new URL(baseUrl); + + const linkSelector = selector || 'a[href]'; + + $(linkSelector).each((_, element) => { + const href = $(element).attr('href'); + if (!href) return; + + try { + // Skip non-http links + if ( + href.startsWith('javascript:') || + href.startsWith('mailto:') || + href.startsWith('tel:') || + href.startsWith('#') + ) { + return; + } + + // Resolve relative URLs + const absoluteUrl = new URL(href, baseUrl); + + // Only include same-origin links (or all if needed) + if (absoluteUrl.origin === baseUrlObj.origin) { + // Remove hash and normalize + absoluteUrl.hash = ''; + links.add(absoluteUrl.href); + } + } catch { + // Invalid URL, skip + } + }); + + return Array.from(links); + } + + private extractMetadata($: cheerio.CheerioAPI): Record { + const metadata: Record = {}; + + // OpenGraph metadata + $('meta[property^="og:"]').each((_, element) => { + const property = $(element).attr('property')?.replace('og:', ''); + const content = $(element).attr('content'); + if (property && content) { + metadata[`og_${property}`] = content; + } + }); + + // Standard meta tags + const description = $('meta[name="description"]').attr('content'); + if (description) metadata.description = description; + + const keywords = $('meta[name="keywords"]').attr('content'); + if (keywords) metadata.keywords = keywords; + + const author = $('meta[name="author"]').attr('content'); + if (author) metadata.author = author; + + const canonical = $('link[rel="canonical"]').attr('href'); + if (canonical) metadata.canonical = canonical; + + // Schema.org JSON-LD + $('script[type="application/ld+json"]').each((_, element) => { + try { + const json = JSON.parse($(element).html() || ''); + if (!metadata.jsonLd) { + metadata.jsonLd = []; + } + (metadata.jsonLd as unknown[]).push(json); + } catch { + // Invalid JSON, skip + } + }); + + return metadata; + } + + private cleanText(html: string): string { + return html + .replace(/)<[^<]*)*<\/script>/gi, '') + .replace(/)<[^<]*)*<\/style>/gi, '') + .replace(/<[^>]+>/g, ' ') + .replace(/ /g, ' ') + .replace(/&/g, '&') + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/"/g, '"') + .replace(/\s+/g, ' ') + .trim(); + } +} diff --git a/services/mana-crawler/src/queue/constants.ts b/services/mana-crawler/src/queue/constants.ts new file mode 100644 index 000000000..0c85a9cd6 --- /dev/null +++ b/services/mana-crawler/src/queue/constants.ts @@ -0,0 +1 @@ +export const CRAWL_QUEUE = 'crawl'; diff --git a/services/mana-crawler/src/queue/processor.module.ts b/services/mana-crawler/src/queue/processor.module.ts new file mode 100644 index 000000000..0e321c10f --- /dev/null +++ b/services/mana-crawler/src/queue/processor.module.ts @@ -0,0 +1,20 @@ +import { Module, forwardRef } from '@nestjs/common'; +import { CrawlProcessor } from './processors/crawl.processor'; +import { ParserModule } from '../parser/parser.module'; +import { RobotsModule } from '../robots/robots.module'; +import { CacheModule } from '../cache/cache.module'; +import { MetricsModule } from '../metrics/metrics.module'; +import { QueueModule } from './queue.module'; +import { CRAWL_QUEUE } from './constants'; + +@Module({ + imports: [ + forwardRef(() => QueueModule), + ParserModule, + RobotsModule, + CacheModule, + MetricsModule, + ], + providers: [CrawlProcessor], +}) +export class ProcessorModule {} diff --git a/services/mana-crawler/src/queue/processors/crawl.processor.ts b/services/mana-crawler/src/queue/processors/crawl.processor.ts new file mode 100644 index 000000000..d354d23f5 --- /dev/null +++ b/services/mana-crawler/src/queue/processors/crawl.processor.ts @@ -0,0 +1,350 @@ +import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq'; +import { Logger, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Job, Queue } from 'bullmq'; +import { eq, sql } from 'drizzle-orm'; +import { CRAWL_QUEUE } from '../constants'; +import { CrawlPageJob } from '../queue.service'; +import { ParserService } from '../../parser/parser.service'; +import { RobotsService } from '../../robots/robots.service'; +import { CacheService } from '../../cache/cache.service'; +import { MetricsService } from '../../metrics/metrics.service'; +import { DATABASE_CONNECTION } from '../../db/database.module'; +import { crawlJobs, crawlResults, type NewCrawlResult } from '../../db/schema'; + +@Processor(CRAWL_QUEUE, { + concurrency: 5, +}) +export class CrawlProcessor extends WorkerHost { + private readonly logger = new Logger(CrawlProcessor.name); + private readonly userAgent: string; + private readonly timeout: number; + private readonly processedUrls = new Map>(); + + constructor( + private readonly configService: ConfigService, + @InjectQueue(CRAWL_QUEUE) private readonly crawlQueue: Queue, + private readonly parserService: ParserService, + private readonly robotsService: RobotsService, + private readonly cacheService: CacheService, + private readonly metricsService: MetricsService, + @Inject(DATABASE_CONNECTION) private readonly db: any, + ) { + super(); + this.userAgent = this.configService.get( + 'crawler.userAgent', + 'ManaCoreCrawler/1.0', + ); + this.timeout = this.configService.get('crawler.timeout', 30000); + } + + async process(job: Job): Promise { + const { jobId, url, parentUrl, depth, config } = job.data; + const startTime = Date.now(); + + this.logger.debug(`Processing ${url} (depth: ${depth}, job: ${jobId})`); + + try { + // Check if job is still active + const [crawlJob] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (!crawlJob || ['cancelled', 'paused', 'completed', 'failed'].includes(crawlJob.status)) { + this.logger.debug(`Job ${jobId} is no longer active, skipping`); + return; + } + + // Initialize URL tracking for this job + if (!this.processedUrls.has(jobId)) { + this.processedUrls.set(jobId, new Set()); + } + const processed = this.processedUrls.get(jobId)!; + + // Check if URL already processed + if (processed.has(url)) { + this.logger.debug(`URL already processed: ${url}`); + return; + } + processed.add(url); + + // Check max pages limit + if (crawlJob.progress.crawled >= config.maxPages) { + this.logger.debug(`Max pages reached for job ${jobId}`); + await this.completeJob(jobId); + return; + } + + // Check robots.txt + if (config.respectRobots) { + const robotsCheck = await this.robotsService.checkUrlWithRobots(url); + if (!robotsCheck.allowed) { + this.logger.debug(`URL blocked by robots.txt: ${url}`); + await this.updateProgress(jobId, { failed: 1 }); + return; + } + } + + // Check URL patterns + if (!this.matchesPatterns(url, config.includePatterns, config.excludePatterns)) { + this.logger.debug(`URL doesn't match patterns: ${url}`); + return; + } + + // Fetch the page + const fetchStart = Date.now(); + const response = await fetch(url, { + headers: { + 'User-Agent': this.userAgent, + Accept: 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + }, + signal: AbortSignal.timeout(this.timeout), + }); + const fetchDuration = Date.now() - fetchStart; + + if (!response.ok) { + await this.saveResult(jobId, { + url, + parentUrl, + depth, + statusCode: response.status, + error: `HTTP ${response.status}`, + fetchDurationMs: fetchDuration, + }); + await this.updateProgress(jobId, { crawled: 1, failed: 1 }); + this.metricsService.recordPageProcessed('error'); + return; + } + + const contentType = response.headers.get('content-type') || ''; + if (!contentType.includes('text/html')) { + this.logger.debug(`Skipping non-HTML content: ${url}`); + return; + } + + const html = await response.text(); + const contentLength = html.length; + + // Parse the page + const parseStart = Date.now(); + const parsed = this.parserService.parse(html, { + selectors: config.selectors, + includeMarkdown: config.output?.format === 'markdown', + includeHtml: config.output?.format === 'html', + baseUrl: url, + }); + const parseDuration = Date.now() - parseStart; + + // Save result + await this.saveResult(jobId, { + url, + parentUrl, + depth, + title: parsed.title, + content: parsed.content, + markdown: parsed.markdown, + html: parsed.html, + links: parsed.links, + metadata: parsed.metadata, + statusCode: response.status, + fetchDurationMs: fetchDuration, + parseDurationMs: parseDuration, + contentLength, + }); + + await this.updateProgress(jobId, { + crawled: 1, + discovered: parsed.links.length, + }); + this.metricsService.recordPageProcessed('success'); + + // Queue discovered links + if (depth < config.maxDepth && crawlJob.progress.crawled < config.maxPages) { + for (const link of parsed.links) { + if (!processed.has(link)) { + try { + const urlHash = Buffer.from(link).toString('base64').slice(0, 32); + await this.crawlQueue.add( + 'crawl-page', + { + jobId, + url: link, + parentUrl: url, + depth: depth + 1, + config, + } as CrawlPageJob, + { + jobId: `${jobId}-${urlHash}`, + delay: Math.floor(1000 / config.rateLimit), + }, + ); + } catch (error) { + // Job might already exist, ignore + } + } + } + } + + this.logger.debug( + `Processed ${url} in ${Date.now() - startTime}ms (${parsed.links.length} links found)`, + ); + } catch (error) { + this.logger.error(`Error processing ${url}: ${error}`); + this.metricsService.recordCrawlError('fetch_error'); + + await this.saveResult(jobId, { + url, + parentUrl, + depth, + error: error instanceof Error ? error.message : 'Unknown error', + fetchDurationMs: Date.now() - startTime, + }); + + await this.updateProgress(jobId, { crawled: 1, failed: 1 }); + throw error; // Let BullMQ handle retries + } + } + + @OnWorkerEvent('completed') + async onCompleted(job: Job) { + // Check if this was the last job for this crawl + const { jobId } = job.data; + const counts = await this.crawlQueue.getJobCounts( + 'waiting', + 'active', + 'delayed', + ); + + if ((counts.waiting ?? 0) === 0 && (counts.active ?? 0) === 0 && (counts.delayed ?? 0) === 0) { + // Check if there are jobs for this specific crawl + const [crawlJob] = await this.db + .select() + .from(crawlJobs) + .where(eq(crawlJobs.id, jobId)) + .limit(1); + + if (crawlJob && crawlJob.status === 'running') { + await this.completeJob(jobId); + } + } + } + + @OnWorkerEvent('failed') + onFailed(job: Job, error: Error) { + this.logger.error(`Job ${job.id} failed: ${error.message}`); + this.metricsService.recordCrawlError('job_failed'); + } + + private matchesPatterns( + url: string, + includePatterns?: string[], + excludePatterns?: string[], + ): boolean { + // Check exclude patterns first + if (excludePatterns?.length) { + for (const pattern of excludePatterns) { + if (this.matchPattern(url, pattern)) { + return false; + } + } + } + + // If no include patterns, allow all + if (!includePatterns?.length) { + return true; + } + + // Check include patterns + for (const pattern of includePatterns) { + if (this.matchPattern(url, pattern)) { + return true; + } + } + + return false; + } + + private matchPattern(url: string, pattern: string): boolean { + // Simple glob pattern matching + const regex = pattern + .replace(/[.*+?^${}()|[\]\\]/g, '\\$&') + .replace(/\\\*/g, '.*') + .replace(/\\\?/g, '.'); + return new RegExp(regex).test(url); + } + + private async saveResult( + jobId: string, + result: Omit, + ): Promise { + try { + await this.db.insert(crawlResults).values({ + jobId, + ...result, + }); + } catch (error) { + this.logger.error(`Failed to save result for ${result.url}: ${error}`); + } + } + + private async updateProgress( + jobId: string, + delta: { discovered?: number; crawled?: number; failed?: number; queued?: number }, + ): Promise { + try { + const updates: string[] = []; + if (delta.discovered) { + updates.push(`'discovered', COALESCE((progress->>'discovered')::int, 0) + ${delta.discovered}`); + } + if (delta.crawled) { + updates.push(`'crawled', COALESCE((progress->>'crawled')::int, 0) + ${delta.crawled}`); + } + if (delta.failed) { + updates.push(`'failed', COALESCE((progress->>'failed')::int, 0) + ${delta.failed}`); + } + if (delta.queued) { + updates.push(`'queued', COALESCE((progress->>'queued')::int, 0) + ${delta.queued}`); + } + + if (updates.length > 0) { + await this.db + .update(crawlJobs) + .set({ + progress: sql`jsonb_build_object( + 'discovered', COALESCE((progress->>'discovered')::int, 0) + ${delta.discovered || 0}, + 'crawled', COALESCE((progress->>'crawled')::int, 0) + ${delta.crawled || 0}, + 'failed', COALESCE((progress->>'failed')::int, 0) + ${delta.failed || 0}, + 'queued', COALESCE((progress->>'queued')::int, 0) + ${delta.queued || 0} + )`, + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, jobId)); + } + } catch (error) { + this.logger.error(`Failed to update progress for job ${jobId}: ${error}`); + } + } + + private async completeJob(jobId: string): Promise { + try { + await this.db + .update(crawlJobs) + .set({ + status: 'completed', + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(crawlJobs.id, jobId)); + + // Clean up URL tracking + this.processedUrls.delete(jobId); + + this.logger.log(`Crawl job ${jobId} completed`); + } catch (error) { + this.logger.error(`Failed to complete job ${jobId}: ${error}`); + } + } +} diff --git a/services/mana-crawler/src/queue/queue.module.ts b/services/mana-crawler/src/queue/queue.module.ts new file mode 100644 index 000000000..2ed9eee42 --- /dev/null +++ b/services/mana-crawler/src/queue/queue.module.ts @@ -0,0 +1,33 @@ +import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { BullBoardModule } from '@bull-board/nestjs'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; +import { QueueService } from './queue.service'; +import { CRAWL_QUEUE } from './constants'; + +// Re-export for convenience +export { CRAWL_QUEUE } from './constants'; + +@Module({ + imports: [ + BullModule.registerQueue({ + name: CRAWL_QUEUE, + defaultJobOptions: { + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + removeOnComplete: 100, + removeOnFail: 1000, + }, + }), + BullBoardModule.forFeature({ + name: CRAWL_QUEUE, + adapter: BullMQAdapter, + }), + ], + providers: [QueueService], + exports: [QueueService, BullModule], +}) +export class QueueModule {} diff --git a/services/mana-crawler/src/queue/queue.service.ts b/services/mana-crawler/src/queue/queue.service.ts new file mode 100644 index 000000000..4def2d1c2 --- /dev/null +++ b/services/mana-crawler/src/queue/queue.service.ts @@ -0,0 +1,150 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue, Job } from 'bullmq'; +import { CRAWL_QUEUE } from './constants'; +import { CrawlJob } from '../db/schema'; + +export interface CrawlPageJob { + jobId: string; + url: string; + parentUrl?: string; + depth: number; + config: { + maxDepth: number; + maxPages: number; + rateLimit: number; + respectRobots: boolean; + includePatterns?: string[]; + excludePatterns?: string[]; + selectors?: { + title?: string; + content?: string; + links?: string; + custom?: Record; + }; + output?: { + format?: 'text' | 'html' | 'markdown'; + }; + }; +} + +@Injectable() +export class QueueService { + private readonly logger = new Logger(QueueService.name); + + constructor(@InjectQueue(CRAWL_QUEUE) private readonly crawlQueue: Queue) {} + + async addCrawlJob(crawlJob: CrawlJob): Promise> { + const jobData: CrawlPageJob = { + jobId: crawlJob.id, + url: crawlJob.startUrl, + depth: 0, + config: { + maxDepth: crawlJob.maxDepth, + maxPages: crawlJob.maxPages, + rateLimit: crawlJob.rateLimit, + respectRobots: crawlJob.respectRobots, + includePatterns: crawlJob.includePatterns ?? undefined, + excludePatterns: crawlJob.excludePatterns ?? undefined, + selectors: crawlJob.selectors ?? undefined, + output: crawlJob.output ?? undefined, + }, + }; + + const job = await this.crawlQueue.add('crawl-page', jobData, { + jobId: `${crawlJob.id}-start`, + }); + + this.logger.log(`Added crawl job ${crawlJob.id} to queue`); + return job; + } + + async addPageToQueue( + jobId: string, + url: string, + parentUrl: string, + depth: number, + config: CrawlPageJob['config'], + ): Promise> { + const jobData: CrawlPageJob = { + jobId, + url, + parentUrl, + depth, + config, + }; + + // Use URL hash as job ID to prevent duplicates + const urlHash = Buffer.from(url).toString('base64').slice(0, 32); + const job = await this.crawlQueue.add('crawl-page', jobData, { + jobId: `${jobId}-${urlHash}`, + delay: Math.floor(1000 / config.rateLimit), // Rate limiting delay + }); + + return job; + } + + async getJobCounts(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + }> { + const counts = await this.crawlQueue.getJobCounts( + 'waiting', + 'active', + 'completed', + 'failed', + 'delayed', + ); + return { + waiting: counts.waiting ?? 0, + active: counts.active ?? 0, + completed: counts.completed ?? 0, + failed: counts.failed ?? 0, + delayed: counts.delayed ?? 0, + }; + } + + async pauseJob(jobId: string): Promise { + // Get all jobs for this crawl job + const jobs = await this.crawlQueue.getJobs(['waiting', 'delayed']); + for (const job of jobs) { + if (job.data.jobId === jobId) { + await job.remove(); + } + } + this.logger.log(`Paused crawl job ${jobId}`); + } + + async cancelJob(jobId: string): Promise { + // Remove all jobs for this crawl job + const jobs = await this.crawlQueue.getJobs([ + 'waiting', + 'delayed', + 'active', + ]); + for (const job of jobs) { + if (job.data.jobId === jobId) { + await job.remove(); + } + } + this.logger.log(`Cancelled crawl job ${jobId}`); + } + + async getQueueStats(): Promise<{ + name: string; + counts: Record; + isPaused: boolean; + }> { + const counts = await this.getJobCounts(); + const isPaused = await this.crawlQueue.isPaused(); + + return { + name: CRAWL_QUEUE, + counts, + isPaused, + }; + } +} diff --git a/services/mana-crawler/src/robots/robots.module.ts b/services/mana-crawler/src/robots/robots.module.ts new file mode 100644 index 000000000..d466223c1 --- /dev/null +++ b/services/mana-crawler/src/robots/robots.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { RobotsService } from './robots.service'; +import { CacheModule } from '../cache/cache.module'; + +@Module({ + imports: [CacheModule], + providers: [RobotsService], + exports: [RobotsService], +}) +export class RobotsModule {} diff --git a/services/mana-crawler/src/robots/robots.service.ts b/services/mana-crawler/src/robots/robots.service.ts new file mode 100644 index 000000000..392096eb9 --- /dev/null +++ b/services/mana-crawler/src/robots/robots.service.ts @@ -0,0 +1,143 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import robotsParser from 'robots-parser'; +import { CacheService } from '../cache/cache.service'; + +interface RobotsData { + allowed: boolean; + crawlDelay?: number; + sitemaps: string[]; +} + +@Injectable() +export class RobotsService { + private readonly logger = new Logger(RobotsService.name); + private readonly userAgent: string; + private readonly cacheTtl: number; + + constructor( + private readonly configService: ConfigService, + private readonly cacheService: CacheService, + ) { + this.userAgent = this.configService.get( + 'crawler.userAgent', + 'ManaCoreCrawler/1.0', + ); + this.cacheTtl = this.configService.get('cache.robotsTtl', 86400); + } + + async isAllowed(url: string): Promise { + try { + const urlObj = new URL(url); + const robotsUrl = `${urlObj.protocol}//${urlObj.host}/robots.txt`; + const cacheKey = `robots:${urlObj.host}`; + + // Check cache first + const cached = await this.cacheService.get(cacheKey); + if (cached !== null) { + return this.checkUrl(cached, url); + } + + // Fetch robots.txt + const robotsData = await this.fetchRobots(robotsUrl, urlObj.host); + + // Cache the result + await this.cacheService.set(cacheKey, robotsData, this.cacheTtl); + + return this.checkUrl(robotsData, url); + } catch (error) { + this.logger.warn(`Error checking robots.txt for ${url}: ${error}`); + // If we can't check, allow by default + return true; + } + } + + async getCrawlDelay(domain: string): Promise { + const cacheKey = `robots:${domain}`; + const cached = await this.cacheService.get(cacheKey); + return cached?.crawlDelay; + } + + async getSitemaps(domain: string): Promise { + const cacheKey = `robots:${domain}`; + const cached = await this.cacheService.get(cacheKey); + return cached?.sitemaps || []; + } + + private async fetchRobots(robotsUrl: string, host: string): Promise { + try { + const response = await fetch(robotsUrl, { + headers: { + 'User-Agent': this.userAgent, + }, + signal: AbortSignal.timeout(5000), + }); + + if (!response.ok) { + // No robots.txt or error - allow all + this.logger.debug(`No robots.txt found for ${host} (${response.status})`); + return { allowed: true, sitemaps: [] }; + } + + const robotsTxt = await response.text(); + const robots = robotsParser(robotsUrl, robotsTxt); + + // Get crawl delay + const crawlDelay = robots.getCrawlDelay(this.userAgent); + + // Get sitemaps + const sitemaps = robots.getSitemaps(); + + return { + allowed: true, // Will be checked per-URL + crawlDelay: crawlDelay ? Number(crawlDelay) : undefined, + sitemaps, + }; + } catch (error) { + this.logger.warn(`Failed to fetch robots.txt from ${robotsUrl}: ${error}`); + return { allowed: true, sitemaps: [] }; + } + } + + private checkUrl(robotsData: RobotsData, url: string): boolean { + // For now, we're caching a simplified version + // In production, you might want to cache the full robots.txt + // and parse it each time for more accurate checking + return robotsData.allowed; + } + + async checkUrlWithRobots(url: string): Promise<{ + allowed: boolean; + crawlDelay?: number; + }> { + try { + const urlObj = new URL(url); + const robotsUrl = `${urlObj.protocol}//${urlObj.host}/robots.txt`; + + const response = await fetch(robotsUrl, { + headers: { + 'User-Agent': this.userAgent, + }, + signal: AbortSignal.timeout(5000), + }); + + if (!response.ok) { + return { allowed: true }; + } + + const robotsTxt = await response.text(); + const robots = robotsParser(robotsUrl, robotsTxt); + + const allowed = robots.isAllowed(url, this.userAgent) ?? true; + const crawlDelay = robots.getCrawlDelay(this.userAgent); + + return { + allowed, + crawlDelay: crawlDelay ? Number(crawlDelay) : undefined, + }; + } catch (error) { + this.logger.warn(`Error checking robots.txt for ${url}: ${error}`); + return { allowed: true }; + } + } +} diff --git a/services/mana-crawler/tsconfig.json b/services/mana-crawler/tsconfig.json new file mode 100644 index 000000000..f02c2417e --- /dev/null +++ b/services/mana-crawler/tsconfig.json @@ -0,0 +1,25 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2022", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": true, + "noImplicitAny": true, + "strictBindCallApply": true, + "forceConsistentCasingInFileNames": true, + "noFallthroughCasesInSwitch": true, + "esModuleInterop": true, + "resolveJsonModule": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +}