feat: stream hashing and archive-entry import support

- Añade `computeHashesFromStream` para hashing desde streams
- Adapta `importDirectory` para procesar entradas internas usando `streamArchiveEntry`
- Añade tests unitarios para hashing por stream e import de entradas de archive
This commit is contained in:
2026-02-09 19:36:18 +01:00
parent 97a7f74685
commit 7ca465fb73
6 changed files with 183 additions and 3 deletions

View File

@@ -72,4 +72,37 @@ export async function computeHashes(filePath: string): Promise<{
});
}
export async function computeHashesFromStream(rs: NodeJS.ReadableStream): Promise<{
size: number;
md5: string;
sha1: string;
crc32: string;
}> {
return new Promise((resolve, reject) => {
const md5 = createHash('md5');
const sha1 = createHash('sha1');
let size = 0;
let crc = 0xffffffff >>> 0;
rs.on('error', (err: any) => reject(err));
rs.on('data', (chunk: Buffer) => {
md5.update(chunk);
sha1.update(chunk);
size += chunk.length;
crc = updateCrc(crc, chunk);
});
rs.on('end', () => {
const md5sum = md5.digest('hex');
const sha1sum = sha1.digest('hex');
const final = (crc ^ 0xffffffff) >>> 0;
const crcHex = final.toString(16).padStart(8, '0').toLowerCase();
resolve({ size, md5: md5sum, sha1: sha1sum, crc32: crcHex });
});
});
}
export default computeHashes;

View File

@@ -12,7 +12,8 @@
import path from 'path';
import { promises as fsPromises } from 'fs';
import { scanDirectory } from './fsScanner';
import { computeHashes } from './checksumService';
import { computeHashes, computeHashesFromStream } from './checksumService';
import { streamArchiveEntry } from './archiveReader';
import prisma from '../plugins/prisma';
/**
@@ -66,7 +67,22 @@ export async function importDirectory(
processed++;
try {
const hashes = await computeHashes(file.path);
let hashes: { size: number; md5: string; sha1: string; crc32: string };
if (file.isArchiveEntry) {
const stream = await streamArchiveEntry(file.containerPath, file.entryPath, logger);
if (!stream) {
logger.warn?.(
{ file },
'importDirectory: no se pudo extraer entrada del archive, saltando'
);
continue;
}
hashes = await computeHashesFromStream(stream as any);
} else {
hashes = await computeHashes(file.path);
}
const checksum = hashes.md5;
const size = hashes.size;
@@ -100,7 +116,10 @@ export async function importDirectory(
upserted++;
}
} catch (err) {
logger.warn?.({ err, file }, 'importDirectory: error procesando fichero, se continúa con el siguiente');
logger.warn?.(
{ err, file },
'importDirectory: error procesando fichero, se continúa con el siguiente'
);
continue;
}
}

View File

@@ -0,0 +1,24 @@
import { describe, it, expect } from 'vitest';
import { Readable } from 'stream';
import fs from 'fs/promises';
import path from 'path';
import { computeHashes, computeHashesFromStream } from '../../src/services/checksumService';
describe('services/checksumService (stream)', () => {
it('computeHashesFromStream produces same result as computeHashes(file)', async () => {
const data = Buffer.from('quasar-stream-test');
const tmpDir = await fs.mkdtemp(path.join(process.cwd(), 'tmp-checksum-'));
const tmpFile = path.join(tmpDir, 'test.bin');
await fs.writeFile(tmpFile, data);
const expected = await computeHashes(tmpFile);
const rs = Readable.from([data]);
const actual = await computeHashesFromStream(rs as any);
expect(actual).toEqual(expected);
await fs.rm(tmpDir, { recursive: true, force: true });
});
});

View File

@@ -0,0 +1,69 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { Readable } from 'stream';
vi.mock('../../src/services/fsScanner', () => ({ scanDirectory: vi.fn() }));
vi.mock('../../src/services/archiveReader', () => ({ streamArchiveEntry: vi.fn() }));
vi.mock('../../src/plugins/prisma', () => ({
default: {
game: { findUnique: vi.fn(), create: vi.fn() },
romFile: { upsert: vi.fn() },
},
}));
import importDirectory, { createSlug } from '../../src/services/importService';
import { scanDirectory } from '../../src/services/fsScanner';
import { streamArchiveEntry } from '../../src/services/archiveReader';
import prisma from '../../src/plugins/prisma';
import { createHash } from 'crypto';
beforeEach(() => {
vi.restoreAllMocks();
});
describe('services/importService (archive entries)', () => {
it('procesa una entrada interna usando streamArchiveEntry y hace upsert', async () => {
const files = [
{
path: '/roms/collection.zip::inner/rom1.bin',
containerPath: '/roms/collection.zip',
entryPath: 'inner/rom1.bin',
filename: 'rom1.bin',
name: 'inner/rom1.bin',
size: 123,
format: 'bin',
isArchiveEntry: true,
},
];
const data = Buffer.from('import-archive-test');
(scanDirectory as unknown as vi.Mock).mockResolvedValue(files);
(streamArchiveEntry as unknown as vi.Mock).mockResolvedValue(Readable.from([data]));
(prisma.game.findUnique as unknown as vi.Mock).mockResolvedValue(null);
(prisma.game.create as unknown as vi.Mock).mockResolvedValue({
id: 77,
title: 'ROM1',
slug: 'rom1',
});
(prisma.romFile.upsert as unknown as vi.Mock).mockResolvedValue({ id: 1 });
const md5 = createHash('md5').update(data).digest('hex');
const summary = await importDirectory({ dir: '/roms', persist: true });
expect((streamArchiveEntry as unknown as vi.Mock).mock.calls.length).toBe(1);
expect((streamArchiveEntry as unknown as vi.Mock).mock.calls[0][0]).toBe(
'/roms/collection.zip'
);
expect((streamArchiveEntry as unknown as vi.Mock).mock.calls[0][1]).toBe('inner/rom1.bin');
expect((prisma.romFile.upsert as unknown as vi.Mock).mock.calls.length).toBe(1);
const upsertArgs = (prisma.romFile.upsert as unknown as vi.Mock).mock.calls[0][0];
expect(upsertArgs.where).toEqual({ checksum: md5 });
expect(upsertArgs.create.filename).toBe('rom1.bin');
expect(upsertArgs.create.path).toBe('/roms/collection.zip::inner/rom1.bin');
expect(summary).toEqual({ processed: 1, createdCount: 1, upserted: 1 });
});
});

View File

@@ -0,0 +1 @@
quasar-stream-test

View File

@@ -0,0 +1,34 @@
## Phase 3 Complete: Hashing por stream y soporte en importService
TL;DR: Implementado `computeHashesFromStream` y adaptado `importDirectory` para procesar entradas internas de archivos usando `archiveReader.streamArchiveEntry`. Añadidos tests unitarios TDD que validan hashing desde streams y el flujo de import para entradas internas.
**Files created/changed:**
- backend/src/services/checksumService.ts (añade `computeHashesFromStream`)
- backend/src/services/importService.ts (usa `streamArchiveEntry` y `computeHashesFromStream` para `isArchiveEntry`)
- backend/tests/services/checksumService.stream.spec.ts (nuevo)
- backend/tests/services/importService.archiveEntry.spec.ts (nuevo)
**Functions created/changed:**
- `computeHashesFromStream(rs)` — calcula `md5`, `sha1`, `crc32` y `size` desde un `Readable`.
- `importDirectory` — para objetos con `isArchiveEntry` obtiene un stream con `streamArchiveEntry(containerPath, entryPath)` y calcula hashes en streaming.
**Tests creados/ejecutados:**
- `backend/tests/services/checksumService.stream.spec.ts` — pasa (1 test).
- `backend/tests/services/importService.archiveEntry.spec.ts` — pasa (1 test).
**Review Status:** APPROVED with minor recommendations
**Notas / Recomendaciones:**
- Se sugiere limpiar listeners en `computeHashesFromStream` (evitar fugas con streams inusuales).
- Algunos specs usan casts `as unknown as vi.Mock`; si TypeScript da errores, convertir esos casts a `import type { Mock } from 'vitest'` y usar `Mock` o usar `any`.
**Git Commit Message:**
feat: stream hashing and archive-entry import support
- Añade `computeHashesFromStream` para hashing desde streams
- Adapta `importDirectory` para procesar entradas internas usando `streamArchiveEntry`
- Añade tests unitarios para hashing por stream e import de entradas de archive