- Añade ImportRunner en memoria con concurrencia configurable - Tests TDD para enqueue, concurrencia y comportamiento tras stop - Actualiza /api/import/scan para encolar jobs y registrar errores - Ajusta tsconfig.json para incluir tests en comprobaciones de tipo
120 lines
3.3 KiB
TypeScript
120 lines
3.3 KiB
TypeScript
import { describe, it, expect } from 'vitest';
|
|
import { ImportRunner } from '../../src/jobs/importRunner';
|
|
|
|
describe('jobs/importRunner', () => {
|
|
it('enqueue rechaza después de stop', async () => {
|
|
const runner = new ImportRunner(1);
|
|
runner.start();
|
|
runner.stop();
|
|
|
|
await expect(runner.enqueue(() => 'x')).rejects.toThrow();
|
|
});
|
|
|
|
it('rechaza tareas en cola tras stop', async () => {
|
|
const r = new ImportRunner(1);
|
|
|
|
// Primera tarea comienza inmediatamente
|
|
const t1 = r.enqueue(async () => {
|
|
await new Promise((res) => setTimeout(res, 50));
|
|
return 'ok1';
|
|
});
|
|
|
|
// Segunda tarea quedará en cola
|
|
const t2 = r.enqueue(async () => 'ok2');
|
|
|
|
// Parar el runner inmediatamente
|
|
r.stop();
|
|
|
|
await expect(t1).resolves.toBe('ok1');
|
|
await expect(t2).rejects.toThrow(/ImportRunner stopped/);
|
|
|
|
const s = r.getStatus();
|
|
expect(s.completed).toBeGreaterThanOrEqual(1);
|
|
});
|
|
|
|
it('completed incrementa en rechazo', async () => {
|
|
const runner = new ImportRunner(1);
|
|
runner.start();
|
|
|
|
const p = runner.enqueue(() => Promise.reject(new Error('boom')));
|
|
|
|
await expect(p).rejects.toThrow('boom');
|
|
|
|
const status = runner.getStatus();
|
|
expect(status.completed).toBeGreaterThanOrEqual(1);
|
|
|
|
runner.stop();
|
|
});
|
|
|
|
it('enqueue resuelve con el resultado de la tarea', async () => {
|
|
const runner = new ImportRunner(2);
|
|
runner.start();
|
|
|
|
const result = await runner.enqueue(async () => 'ok');
|
|
expect(result).toBe('ok');
|
|
|
|
const status = runner.getStatus();
|
|
expect(status.completed).toBe(1);
|
|
expect(status.running).toBe(0);
|
|
expect(status.queued).toBe(0);
|
|
expect(status.concurrency).toBe(2);
|
|
|
|
runner.stop();
|
|
});
|
|
|
|
it('respeta la concurrencia configurada', async () => {
|
|
const concurrency = 2;
|
|
const runner = new ImportRunner(concurrency);
|
|
runner.start();
|
|
|
|
let active = 0;
|
|
const observed: number[] = [];
|
|
|
|
const makeTask = (delay: number) => async () => {
|
|
active++;
|
|
observed.push(active);
|
|
await new Promise((r) => setTimeout(r, delay));
|
|
active--;
|
|
return 'done';
|
|
};
|
|
|
|
const promises = [];
|
|
for (let i = 0; i < 5; i++) {
|
|
promises.push(runner.enqueue(makeTask(80)));
|
|
}
|
|
|
|
await Promise.all(promises);
|
|
|
|
expect(Math.max(...observed)).toBeLessThanOrEqual(concurrency);
|
|
|
|
runner.stop();
|
|
});
|
|
|
|
it('getStatus reporta queued, running, completed y concurrency', async () => {
|
|
const concurrency = 2;
|
|
const runner = new ImportRunner(concurrency);
|
|
runner.start();
|
|
|
|
const p1 = runner.enqueue(() => new Promise((r) => setTimeout(() => r('a'), 60)));
|
|
const p2 = runner.enqueue(() => new Promise((r) => setTimeout(() => r('b'), 60)));
|
|
const p3 = runner.enqueue(() => new Promise((r) => setTimeout(() => r('c'), 60)));
|
|
|
|
// allow the runner to start tasks
|
|
await new Promise((r) => setImmediate(r));
|
|
|
|
const statusNow = runner.getStatus();
|
|
expect(statusNow.concurrency).toBe(concurrency);
|
|
expect(statusNow.running).toBeLessThanOrEqual(concurrency);
|
|
expect(statusNow.queued).toBeGreaterThanOrEqual(0);
|
|
|
|
await Promise.all([p1, p2, p3]);
|
|
|
|
const statusAfter = runner.getStatus();
|
|
expect(statusAfter.queued).toBe(0);
|
|
expect(statusAfter.running).toBe(0);
|
|
expect(statusAfter.completed).toBe(3);
|
|
|
|
runner.stop();
|
|
});
|
|
});
|