mirror of
https://github.com/Memo-2023/mana-monorepo.git
synced 2026-05-26 23:17:42 +02:00
feat(geocoding): provider chain with Photon + Nominatim fallbacks
mana-geocoding now tries Pelias first, falls back to public Photon
(komoot.io) and finally to public Nominatim (OSM) when Pelias is
unhealthy or unreachable. The Places module's address lookup keeps
working even when the Pelias container is stopped — which it currently
is on the Mac mini, freeing 3 GB of RAM until Pelias gets migrated to
the GPU server.
Architecture:
ProviderChain ─ tries providers in priority order, stops on first
success. A clean empty-results answer is definitive
(don't burn through public-API budget on a query that
legitimately has no match). Only network errors / 5xx
/ 429 trigger fallthrough.
HealthCache ─ per-provider, 30s TTL. A failed health probe or a
failed search marks the provider unhealthy and skips
it for the rest of the cache window. Lazy refresh —
no background pinger.
RateLimiter ─ single-token FIFO queue, 1100ms gap by default.
Used to enforce Nominatim's 1 req/sec policy. Handles
abort during inter-task wait by releasing the busy
flag so later tasks aren't blocked.
Provider details:
pelias — primary, self-hosted DACH index, full OSM taxonomy in
`peliasCategories`, no rate limit
photon — public komoot endpoint, GeoJSON shape, raw `osm_key:
osm_value` mapped via lib/osm-category-map.ts. Faster
than Nominatim, no advertised rate limit but be polite.
nominatim — public OSM endpoint, strict 1 req/sec via the limiter,
custom User-Agent required (otherwise 403). Last
resort — fallback for when Photon is also down.
Response shape changes (additive only — existing callers keep
working):
- results[].provider: 'pelias' | 'photon' | 'nominatim'
- results[].peliasCategories: only present when Pelias served the
request (was already absent on Pelias-API patch failures)
- top-level provider: <name> + tried: <name[]> on success/error
- new endpoint: GET /health/providers — per-provider snapshot
Configuration via env (defaults shipped):
GEOCODING_PROVIDERS=pelias,photon,nominatim # order matters
PROVIDER_TIMEOUT_MS=5000
PROVIDER_HEALTH_CACHE_MS=30000
PHOTON_API_URL=https://photon.komoot.io
NOMINATIM_API_URL=https://nominatim.openstreetmap.org
NOMINATIM_USER_AGENT=mana-geocoding/1.0 (+https://mana.how; ...)
NOMINATIM_INTERVAL_MS=1100
Testing: 115 tests green (was 42). New coverage:
- osm-category-map.test.ts (47 cases over food/transit/shopping/
leisure/work/other priority resolution)
- rate-limiter.test.ts (FIFO, abort-during-wait, abort-during-sleep)
- chain.test.ts (failover, empty-results-stops, health-cache,
snapshot)
- photon-normalizer.test.ts and nominatim-normalizer.test.ts (lock
the wire-format mapping for both fallback providers)
Live smoke against public Photon verified — both /search and /reverse
return correctly normalized results with provider="photon" when Pelias
is unreachable.
This commit is contained in:
parent
ff823bff60
commit
f1e4a39644
17 changed files with 2120 additions and 184 deletions
|
|
@ -0,0 +1,155 @@
|
|||
/**
|
||||
* Unit tests for the raw-OSM-tag → PlaceCategory mapper.
|
||||
*
|
||||
* Covers the cases Photon and Nominatim emit for typical DACH queries.
|
||||
* The Pelias mapper has its own tests in category-map.test.ts; this file
|
||||
* tests *only* the raw-OSM-tag path used by the public-API fallbacks.
|
||||
*/
|
||||
|
||||
import { describe, expect, it } from 'bun:test';
|
||||
import { mapOsmTagToPlaceCategory } from '../osm-category-map';
|
||||
|
||||
describe('mapOsmTagToPlaceCategory', () => {
|
||||
describe('food (highest priority)', () => {
|
||||
it('amenity:restaurant → food', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'restaurant')).toBe('food');
|
||||
});
|
||||
it('amenity:cafe → food', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'cafe')).toBe('food');
|
||||
});
|
||||
it('amenity:bar → food (not leisure)', () => {
|
||||
// Bars sit at the food/leisure boundary. We pick food because the
|
||||
// Places UI groups bars next to restaurants visually.
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'bar')).toBe('food');
|
||||
});
|
||||
it('shop:bakery → food (not shopping)', () => {
|
||||
// Bakery is technically `shop` in OSM but functionally food. We
|
||||
// special-case the shop subtypes that are food.
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'bakery')).toBe('food');
|
||||
});
|
||||
it('shop:butcher → food', () => {
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'butcher')).toBe('food');
|
||||
});
|
||||
});
|
||||
|
||||
describe('transit', () => {
|
||||
it('public_transport:station → transit', () => {
|
||||
expect(mapOsmTagToPlaceCategory('public_transport', 'station')).toBe('transit');
|
||||
});
|
||||
it('public_transport (any value) → transit', () => {
|
||||
// Any value of public_transport falls under transit
|
||||
expect(mapOsmTagToPlaceCategory('public_transport', 'platform')).toBe('transit');
|
||||
expect(mapOsmTagToPlaceCategory('public_transport', 'stop_position')).toBe('transit');
|
||||
});
|
||||
it('railway:station → transit', () => {
|
||||
expect(mapOsmTagToPlaceCategory('railway', 'station')).toBe('transit');
|
||||
});
|
||||
it('railway:tram_stop → transit', () => {
|
||||
expect(mapOsmTagToPlaceCategory('railway', 'tram_stop')).toBe('transit');
|
||||
});
|
||||
it('highway:bus_stop → transit', () => {
|
||||
expect(mapOsmTagToPlaceCategory('highway', 'bus_stop')).toBe('transit');
|
||||
});
|
||||
it('aeroway:aerodrome → transit', () => {
|
||||
expect(mapOsmTagToPlaceCategory('aeroway', 'aerodrome')).toBe('transit');
|
||||
});
|
||||
it('amenity:car_rental → transit', () => {
|
||||
// Matches Pelias mapper's "car_rental → transit" decision
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'car_rental')).toBe('transit');
|
||||
});
|
||||
});
|
||||
|
||||
describe('shopping (after food, so bakery/butcher fall to food first)', () => {
|
||||
it('shop:supermarket → shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'supermarket')).toBe('shopping');
|
||||
});
|
||||
it('shop:clothes → shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'clothes')).toBe('shopping');
|
||||
});
|
||||
it('shop:electronics → shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'electronics')).toBe('shopping');
|
||||
});
|
||||
it('amenity:marketplace → shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'marketplace')).toBe('shopping');
|
||||
});
|
||||
});
|
||||
|
||||
describe('leisure', () => {
|
||||
it('leisure:park → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('leisure', 'park')).toBe('leisure');
|
||||
});
|
||||
it('tourism:attraction → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('tourism', 'attraction')).toBe('leisure');
|
||||
});
|
||||
it('amenity:cinema → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'cinema')).toBe('leisure');
|
||||
});
|
||||
it('amenity:theatre → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'theatre')).toBe('leisure');
|
||||
});
|
||||
it('amenity:nightclub → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'nightclub')).toBe('leisure');
|
||||
});
|
||||
it('sport:tennis → leisure', () => {
|
||||
expect(mapOsmTagToPlaceCategory('sport', 'tennis')).toBe('leisure');
|
||||
});
|
||||
});
|
||||
|
||||
describe('work', () => {
|
||||
it('amenity:school → work', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'school')).toBe('work');
|
||||
});
|
||||
it('amenity:university → work', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'university')).toBe('work');
|
||||
});
|
||||
it('amenity:bank → work', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'bank')).toBe('work');
|
||||
});
|
||||
it('amenity:townhall → work', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'townhall')).toBe('work');
|
||||
});
|
||||
it('office:* → work', () => {
|
||||
expect(mapOsmTagToPlaceCategory('office', 'company')).toBe('work');
|
||||
expect(mapOsmTagToPlaceCategory('office', 'lawyer')).toBe('work');
|
||||
});
|
||||
});
|
||||
|
||||
describe('other (health/religion/unknown)', () => {
|
||||
it('amenity:hospital → other', () => {
|
||||
// Health goes to other (matches Pelias mapper)
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'hospital')).toBe('other');
|
||||
});
|
||||
it('amenity:pharmacy → other', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'pharmacy')).toBe('other');
|
||||
});
|
||||
it('healthcare:doctor → other', () => {
|
||||
expect(mapOsmTagToPlaceCategory('healthcare', 'doctor')).toBe('other');
|
||||
});
|
||||
it('amenity:place_of_worship → other', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'place_of_worship')).toBe('other');
|
||||
});
|
||||
it('unknown class → other', () => {
|
||||
expect(mapOsmTagToPlaceCategory('weirdkey', 'weirdvalue')).toBe('other');
|
||||
});
|
||||
it('undefined inputs → other', () => {
|
||||
expect(mapOsmTagToPlaceCategory()).toBe('other');
|
||||
expect(mapOsmTagToPlaceCategory(undefined, undefined)).toBe('other');
|
||||
expect(mapOsmTagToPlaceCategory('amenity')).toBe('other'); // amenity without value
|
||||
});
|
||||
it('place:city → other (no street/road match)', () => {
|
||||
// Address-layer responses fall through to other
|
||||
expect(mapOsmTagToPlaceCategory('place', 'city')).toBe('other');
|
||||
});
|
||||
});
|
||||
|
||||
describe('priority — value-specific entries beat key-only entries', () => {
|
||||
it('shop:bakery is food, but shop:somethingElse is shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'bakery')).toBe('food');
|
||||
expect(mapOsmTagToPlaceCategory('shop', 'supermarket')).toBe('shopping');
|
||||
});
|
||||
it('amenity:cinema is leisure, but amenity:marketplace is shopping', () => {
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'cinema')).toBe('leisure');
|
||||
expect(mapOsmTagToPlaceCategory('amenity', 'marketplace')).toBe('shopping');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Tests for the single-token rate limiter.
|
||||
*
|
||||
* The hot properties: FIFO ordering, inter-task gap honored, abort
|
||||
* removes from queue without blocking later tasks.
|
||||
*/
|
||||
|
||||
import { describe, expect, it } from 'bun:test';
|
||||
import { RateLimiter } from '../rate-limiter';
|
||||
|
||||
describe('RateLimiter', () => {
|
||||
it('runs a single task immediately', async () => {
|
||||
const lim = new RateLimiter(10);
|
||||
const start = Date.now();
|
||||
const result = await lim.run(async () => 42);
|
||||
const elapsed = Date.now() - start;
|
||||
expect(result).toBe(42);
|
||||
expect(elapsed).toBeLessThan(20); // No initial wait
|
||||
});
|
||||
|
||||
it('spaces successive tasks by intervalMs', async () => {
|
||||
const lim = new RateLimiter(50);
|
||||
const start = Date.now();
|
||||
await lim.run(async () => 1);
|
||||
await lim.run(async () => 2);
|
||||
const elapsed = Date.now() - start;
|
||||
// Second task waits ~50ms before starting. Allow a little jitter.
|
||||
expect(elapsed).toBeGreaterThanOrEqual(45);
|
||||
expect(elapsed).toBeLessThan(150);
|
||||
});
|
||||
|
||||
it('preserves FIFO order under concurrent calls', async () => {
|
||||
const lim = new RateLimiter(20);
|
||||
const order: number[] = [];
|
||||
const tasks = [1, 2, 3, 4].map((n) =>
|
||||
lim.run(async () => {
|
||||
order.push(n);
|
||||
return n;
|
||||
})
|
||||
);
|
||||
await Promise.all(tasks);
|
||||
expect(order).toEqual([1, 2, 3, 4]);
|
||||
});
|
||||
|
||||
it('reports pending count', async () => {
|
||||
const lim = new RateLimiter(50);
|
||||
// First task takes the slot — kick it off but don't await yet
|
||||
const t1 = lim.run(async () => {
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
return 1;
|
||||
});
|
||||
// Schedule two more — they queue
|
||||
const t2 = lim.run(async () => 2);
|
||||
const t3 = lim.run(async () => 3);
|
||||
// Tiny delay so t1 has acquired the lock
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
expect(lim.pending).toBe(2);
|
||||
await Promise.all([t1, t2, t3]);
|
||||
expect(lim.pending).toBe(0);
|
||||
});
|
||||
|
||||
it('aborts a queued task without breaking later ones', async () => {
|
||||
const lim = new RateLimiter(40);
|
||||
const t1 = lim.run(async () => 'first');
|
||||
|
||||
const ctrl = new AbortController();
|
||||
const t2 = lim.run(async () => 'second', ctrl.signal);
|
||||
const t3 = lim.run(async () => 'third');
|
||||
|
||||
// Tiny delay to ensure t1 is running and t2/t3 are queued
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
ctrl.abort();
|
||||
|
||||
// t2 should reject with abort
|
||||
await expect(t2).rejects.toThrow(/aborted/);
|
||||
// t1 + t3 still resolve
|
||||
expect(await t1).toBe('first');
|
||||
expect(await t3).toBe('third');
|
||||
});
|
||||
|
||||
it('aborts during interval-wait without breaking later tasks', async () => {
|
||||
const lim = new RateLimiter(80);
|
||||
await lim.run(async () => 'warmup'); // sets nextSlotAt = now + 80
|
||||
|
||||
const ctrl = new AbortController();
|
||||
const t1 = lim.run(async () => 'next', ctrl.signal);
|
||||
// While t1 is sleeping in the interval-wait, abort it
|
||||
setTimeout(() => ctrl.abort(), 10);
|
||||
await expect(t1).rejects.toThrow(/aborted/);
|
||||
|
||||
// Verify the limiter is still functional
|
||||
const t2 = await lim.run(async () => 'after');
|
||||
expect(t2).toBe('after');
|
||||
});
|
||||
});
|
||||
116
services/mana-geocoding/src/lib/osm-category-map.ts
Normal file
116
services/mana-geocoding/src/lib/osm-category-map.ts
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Maps raw OSM `class:type` tags (Photon's `osm_key:osm_value`,
|
||||
* Nominatim's `class:type`) to our 7 PlaceCategories.
|
||||
*
|
||||
* Pelias has a curated multi-category taxonomy (`food`, `retail`,
|
||||
* `transport`, …) that we map via `category-map.ts`. Photon and Nominatim
|
||||
* return raw OSM tags instead — `amenity:restaurant`, `shop:supermarket`,
|
||||
* `public_transport:station`, etc. — so they need a different lookup.
|
||||
*
|
||||
* The list below is intentionally narrow: it only covers tags we actually
|
||||
* see in real Photon/Nominatim responses for DACH queries. Anything else
|
||||
* falls through to `other`, which matches the Pelias mapper's behavior for
|
||||
* unknown categories.
|
||||
*
|
||||
* If a query returns a tag we don't handle, that's the signal to add it
|
||||
* here — not to try to enumerate all 1000+ OSM types.
|
||||
*/
|
||||
|
||||
import type { PlaceCategory } from './category-map';
|
||||
|
||||
interface Tag {
|
||||
key: string;
|
||||
value?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Priority-ordered: first match wins. More-specific entries (with a
|
||||
* `value`) come before generic key-only entries. Matches Pelias's
|
||||
* "food beats retail" priority intent.
|
||||
*/
|
||||
const OSM_RULES: Array<{ match: Tag; category: PlaceCategory }> = [
|
||||
// ── Food (highest priority — restaurants are food, even when also
|
||||
// tagged amenity or shop) ───────────────────────────────────────
|
||||
{ match: { key: 'amenity', value: 'restaurant' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'cafe' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'fast_food' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'bar' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'pub' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'biergarten' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'food_court' }, category: 'food' },
|
||||
{ match: { key: 'amenity', value: 'ice_cream' }, category: 'food' },
|
||||
{ match: { key: 'shop', value: 'bakery' }, category: 'food' },
|
||||
{ match: { key: 'shop', value: 'butcher' }, category: 'food' },
|
||||
{ match: { key: 'shop', value: 'confectionery' }, category: 'food' },
|
||||
|
||||
// ── Transit ───────────────────────────────────────────────────────
|
||||
{ match: { key: 'public_transport' }, category: 'transit' },
|
||||
{ match: { key: 'railway', value: 'station' }, category: 'transit' },
|
||||
{ match: { key: 'railway', value: 'halt' }, category: 'transit' },
|
||||
{ match: { key: 'railway', value: 'tram_stop' }, category: 'transit' },
|
||||
{ match: { key: 'highway', value: 'bus_stop' }, category: 'transit' },
|
||||
{ match: { key: 'aeroway' }, category: 'transit' },
|
||||
{ match: { key: 'amenity', value: 'bus_station' }, category: 'transit' },
|
||||
{ match: { key: 'amenity', value: 'taxi' }, category: 'transit' },
|
||||
{ match: { key: 'amenity', value: 'ferry_terminal' }, category: 'transit' },
|
||||
{ match: { key: 'amenity', value: 'car_rental' }, category: 'transit' },
|
||||
{ match: { key: 'amenity', value: 'parking' }, category: 'transit' },
|
||||
|
||||
// ── Shopping (after food so bakery/butcher don't fall here) ──────
|
||||
{ match: { key: 'shop' }, category: 'shopping' },
|
||||
{ match: { key: 'amenity', value: 'marketplace' }, category: 'shopping' },
|
||||
|
||||
// ── Leisure / entertainment ──────────────────────────────────────
|
||||
{ match: { key: 'leisure' }, category: 'leisure' },
|
||||
{ match: { key: 'tourism' }, category: 'leisure' },
|
||||
{ match: { key: 'amenity', value: 'cinema' }, category: 'leisure' },
|
||||
{ match: { key: 'amenity', value: 'theatre' }, category: 'leisure' },
|
||||
{ match: { key: 'amenity', value: 'nightclub' }, category: 'leisure' },
|
||||
{ match: { key: 'amenity', value: 'arts_centre' }, category: 'leisure' },
|
||||
{ match: { key: 'sport' }, category: 'leisure' },
|
||||
|
||||
// ── Work-ish ─────────────────────────────────────────────────────
|
||||
{ match: { key: 'amenity', value: 'school' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'university' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'college' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'kindergarten' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'library' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'bank' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'post_office' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'courthouse' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'townhall' }, category: 'work' },
|
||||
{ match: { key: 'amenity', value: 'embassy' }, category: 'work' },
|
||||
{ match: { key: 'office' }, category: 'work' },
|
||||
|
||||
// ── Health / religion → other (matches Pelias mapper) ───────────
|
||||
{ match: { key: 'amenity', value: 'hospital' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'clinic' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'doctors' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'pharmacy' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'dentist' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'veterinary' }, category: 'other' },
|
||||
{ match: { key: 'healthcare' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'place_of_worship' }, category: 'other' },
|
||||
{ match: { key: 'amenity', value: 'grave_yard' }, category: 'other' },
|
||||
|
||||
// Address-layer responses (no class/type, just a road match) →
|
||||
// caller passes `place`/`highway` here, fall through to other
|
||||
];
|
||||
|
||||
/**
|
||||
* Map a single OSM `class:type` pair to a PlaceCategory.
|
||||
*
|
||||
* @param key Photon's `osm_key` or Nominatim's `class` (e.g. `amenity`)
|
||||
* @param value Photon's `osm_value` or Nominatim's `type` (e.g. `restaurant`)
|
||||
*/
|
||||
export function mapOsmTagToPlaceCategory(key?: string, value?: string): PlaceCategory {
|
||||
if (!key) return 'other';
|
||||
|
||||
for (const rule of OSM_RULES) {
|
||||
if (rule.match.key !== key) continue;
|
||||
if (rule.match.value && rule.match.value !== value) continue;
|
||||
return rule.category;
|
||||
}
|
||||
|
||||
return 'other';
|
||||
}
|
||||
96
services/mana-geocoding/src/lib/rate-limiter.ts
Normal file
96
services/mana-geocoding/src/lib/rate-limiter.ts
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Single-token rate limiter. Used for Nominatim's strict 1-req/sec policy.
|
||||
*
|
||||
* Why not p-queue / bottleneck: those are great packages but the surface
|
||||
* we need is tiny (one slot, fixed interval, FIFO) and we want to keep
|
||||
* the wrapper dependency-light. This is ~30 lines of code with a tight
|
||||
* test surface.
|
||||
*
|
||||
* Behavior:
|
||||
* - At most 1 task running at a time.
|
||||
* - Between successive task starts: at least `intervalMs` elapses.
|
||||
* - Tasks queue in FIFO order. No prioritization, no skipping.
|
||||
* - Caller can pass an `AbortSignal` to drop their slot if they no
|
||||
* longer want the answer (e.g. the wrapper's overall timeout fired).
|
||||
*/
|
||||
|
||||
export class RateLimiter {
|
||||
private queue: Array<() => void> = [];
|
||||
private nextSlotAt = 0;
|
||||
private busy = false;
|
||||
|
||||
constructor(private readonly intervalMs: number) {}
|
||||
|
||||
async run<T>(task: () => Promise<T>, signal?: AbortSignal): Promise<T> {
|
||||
await this.acquire(signal);
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
this.release();
|
||||
}
|
||||
}
|
||||
|
||||
private async acquire(signal?: AbortSignal): Promise<void> {
|
||||
// Wait for the previous task to release the slot. The lock is
|
||||
// implemented as a queue of resume-functions; release() pops one.
|
||||
// We need a stable reference to remove from the queue on abort —
|
||||
// a named closure works because we push and splice the same one.
|
||||
if (this.busy) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const entry = () => {
|
||||
signal?.removeEventListener('abort', onAbort);
|
||||
resolve();
|
||||
};
|
||||
const onAbort = () => {
|
||||
const idx = this.queue.indexOf(entry);
|
||||
if (idx >= 0) this.queue.splice(idx, 1);
|
||||
reject(new Error('aborted'));
|
||||
};
|
||||
signal?.addEventListener('abort', onAbort, { once: true });
|
||||
this.queue.push(entry);
|
||||
});
|
||||
}
|
||||
this.busy = true;
|
||||
|
||||
// Honor the inter-task gap. Even if the previous task ran fast,
|
||||
// we space starts at least `intervalMs` apart.
|
||||
const wait = this.nextSlotAt - Date.now();
|
||||
if (wait > 0) {
|
||||
try {
|
||||
await sleep(wait, signal);
|
||||
} catch (e) {
|
||||
// Aborted during the inter-task wait. We've already claimed
|
||||
// the busy flag — release it so the next queued task can
|
||||
// proceed instead of deadlocking.
|
||||
this.release();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
this.nextSlotAt = Date.now() + this.intervalMs;
|
||||
}
|
||||
|
||||
private release(): void {
|
||||
const next = this.queue.shift();
|
||||
this.busy = !!next;
|
||||
if (next) next();
|
||||
}
|
||||
|
||||
get pending(): number {
|
||||
return this.queue.length;
|
||||
}
|
||||
}
|
||||
|
||||
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const t = setTimeout(resolve, ms);
|
||||
signal?.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
clearTimeout(t);
|
||||
reject(new Error('aborted'));
|
||||
},
|
||||
{ once: true }
|
||||
);
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue