diff --git a/cli/package-lock.json b/cli/package-lock.json index 7fcdb7030a..b3a2658a71 100644 --- a/cli/package-lock.json +++ b/cli/package-lock.json @@ -10,6 +10,7 @@ "license": "GNU Affero General Public License version 3", "dependencies": { "fast-glob": "^3.3.2", + "fastq": "^1.17.1", "lodash-es": "^4.17.21" }, "bin": { @@ -39,6 +40,7 @@ "vite": "^5.0.12", "vite-tsconfig-paths": "^4.3.2", "vitest": "^1.2.2", + "vitest-fetch-mock": "^0.2.2", "yaml": "^2.3.1" }, "engines": { @@ -1853,6 +1855,15 @@ "url": "https://opencollective.com/core-js" } }, + "node_modules/cross-fetch": { + "version": "3.1.8", + "resolved": "https://registry.npmjs.org/cross-fetch/-/cross-fetch-3.1.8.tgz", + "integrity": "sha512-cvA+JwZoU0Xq+h6WkMvAUqPEYy92Obet6UdKLfW60qn99ftItKjB5T+BkyWOFWe2pUyfQ+IJHmpOTznqk1M6Kg==", + "dev": true, + "dependencies": { + "node-fetch": "^2.6.12" + } + }, "node_modules/cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -3146,6 +3157,26 @@ "integrity": "sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==", "dev": true }, + "node_modules/node-fetch": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", + "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", + "dev": true, + "dependencies": { + "whatwg-url": "^5.0.0" + }, + "engines": { + "node": "4.x || >=6.0.0" + }, + "peerDependencies": { + "encoding": "^0.1.0" + }, + "peerDependenciesMeta": { + "encoding": { + "optional": true + } + } + }, "node_modules/node-releases": { "version": "2.0.14", "resolved": "https://registry.npmjs.org/node-releases/-/node-releases-2.0.14.tgz", @@ -4171,6 +4202,12 @@ "node": ">=8.0" } }, + "node_modules/tr46": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz", + "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==", + "dev": true + }, "node_modules/ts-api-utils": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-1.3.0.tgz", @@ -4479,6 +4516,37 @@ } } }, + "node_modules/vitest-fetch-mock": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/vitest-fetch-mock/-/vitest-fetch-mock-0.2.2.tgz", + "integrity": "sha512-XmH6QgTSjCWrqXoPREIdbj40T7i1xnGmAsTAgfckoO75W1IEHKR8hcPCQ7SO16RsdW1t85oUm6pcQRLeBgjVYQ==", + "dev": true, + "dependencies": { + "cross-fetch": "^3.0.6" + }, + "engines": { + "node": ">=14.14.0" + }, + "peerDependencies": { + "vitest": ">=0.16.0" + } + }, + "node_modules/webidl-conversions": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", + "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==", + "dev": true + }, + "node_modules/whatwg-url": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", + "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", + "dev": true, + "dependencies": { + "tr46": "~0.0.3", + "webidl-conversions": "^3.0.0" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", diff --git a/cli/package.json b/cli/package.json index 748c3c5301..15cacdc08a 100644 --- a/cli/package.json +++ b/cli/package.json @@ -36,6 +36,7 @@ "vite": "^5.0.12", "vite-tsconfig-paths": "^4.3.2", "vitest": "^1.2.2", + "vitest-fetch-mock": "^0.2.2", "yaml": "^2.3.1" }, "scripts": { @@ -59,6 +60,7 @@ }, "dependencies": { "fast-glob": "^3.3.2", + "fastq": "^1.17.1", "lodash-es": "^4.17.21" }, "volta": { diff --git a/cli/src/commands/asset.spec.ts b/cli/src/commands/asset.spec.ts index e42e2ec964..4bac1d00ab 100644 --- a/cli/src/commands/asset.spec.ts +++ b/cli/src/commands/asset.spec.ts @@ -1,10 +1,18 @@ -import { platform } from 'node:os'; -import { UploadOptionsDto, getAlbumName } from 'src/commands/asset'; -import { describe, expect, it } from 'vitest'; +import * as fs from 'node:fs'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; -describe('Unit function tests', () => { +import { Action, checkBulkUpload, defaults, Reason } from '@immich/sdk'; +import createFetchMock from 'vitest-fetch-mock'; + +import { checkForDuplicates, getAlbumName, uploadFiles, UploadOptionsDto } from './asset'; + +vi.mock('@immich/sdk'); + +describe('getAlbumName', () => { it('should return a non-undefined value', () => { - if (platform() === 'win32') { + if (os.platform() === 'win32') { // This is meaningless for Unix systems. expect(getAlbumName(String.raw`D:\test\Filename.txt`, {} as UploadOptionsDto)).toBe('test'); } @@ -17,3 +25,177 @@ describe('Unit function tests', () => { ); }); }); + +describe('uploadFiles', () => { + const testDir = fs.mkdtempSync(path.join(os.tmpdir(), 'test-')); + const testFilePath = path.join(testDir, 'test.png'); + const testFileData = 'test'; + const baseUrl = 'http://example.com'; + const apiKey = 'key'; + const retry = 3; + + const fetchMocker = createFetchMock(vi); + + beforeEach(() => { + // Create a test file + fs.writeFileSync(testFilePath, testFileData); + + // Defaults + vi.mocked(defaults).baseUrl = baseUrl; + vi.mocked(defaults).headers = { 'x-api-key': apiKey }; + + fetchMocker.enableMocks(); + fetchMocker.resetMocks(); + }); + + it('returns new assets when upload file is successful', async () => { + fetchMocker.doMockIf(new RegExp(`${baseUrl}/assets$`), () => { + return { + status: 200, + body: JSON.stringify({ id: 'fc5621b1-86f6-44a1-9905-403e607df9f5', status: 'created' }), + }; + }); + + await expect(uploadFiles([testFilePath], { concurrency: 1 })).resolves.toEqual([ + { + filepath: testFilePath, + id: 'fc5621b1-86f6-44a1-9905-403e607df9f5', + }, + ]); + }); + + it('returns new assets when upload file retry is successful', async () => { + let counter = 0; + fetchMocker.doMockIf(new RegExp(`${baseUrl}/assets$`), () => { + counter++; + if (counter < retry) { + throw new Error('Network error'); + } + + return { + status: 200, + body: JSON.stringify({ id: 'fc5621b1-86f6-44a1-9905-403e607df9f5', status: 'created' }), + }; + }); + + await expect(uploadFiles([testFilePath], { concurrency: 1 })).resolves.toEqual([ + { + filepath: testFilePath, + id: 'fc5621b1-86f6-44a1-9905-403e607df9f5', + }, + ]); + }); + + it('returns new assets when upload file retry is failed', async () => { + fetchMocker.doMockIf(new RegExp(`${baseUrl}/assets$`), () => { + throw new Error('Network error'); + }); + + await expect(uploadFiles([testFilePath], { concurrency: 1 })).resolves.toEqual([]); + }); +}); + +describe('checkForDuplicates', () => { + const testDir = fs.mkdtempSync(path.join(os.tmpdir(), 'test-')); + const testFilePath = path.join(testDir, 'test.png'); + const testFileData = 'test'; + const testFileChecksum = 'a94a8fe5ccb19ba61c4c0873d391e987982fbbd3'; // SHA1 + const retry = 3; + + beforeEach(() => { + // Create a test file + fs.writeFileSync(testFilePath, testFileData); + }); + + it('checks duplicates', async () => { + vi.mocked(checkBulkUpload).mockResolvedValue({ + results: [ + { + action: Action.Accept, + id: testFilePath, + }, + ], + }); + + await checkForDuplicates([testFilePath], { concurrency: 1 }); + + expect(checkBulkUpload).toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: [ + { + checksum: testFileChecksum, + id: testFilePath, + }, + ], + }, + }); + }); + + it('returns duplicates when check duplicates is rejected', async () => { + vi.mocked(checkBulkUpload).mockResolvedValue({ + results: [ + { + action: Action.Reject, + id: testFilePath, + assetId: 'fc5621b1-86f6-44a1-9905-403e607df9f5', + reason: Reason.Duplicate, + }, + ], + }); + + await expect(checkForDuplicates([testFilePath], { concurrency: 1 })).resolves.toEqual({ + duplicates: [ + { + filepath: testFilePath, + id: 'fc5621b1-86f6-44a1-9905-403e607df9f5', + }, + ], + newFiles: [], + }); + }); + + it('returns new assets when check duplicates is accepted', async () => { + vi.mocked(checkBulkUpload).mockResolvedValue({ + results: [ + { + action: Action.Accept, + id: testFilePath, + }, + ], + }); + + await expect(checkForDuplicates([testFilePath], { concurrency: 1 })).resolves.toEqual({ + duplicates: [], + newFiles: [testFilePath], + }); + }); + + it('returns results when check duplicates retry is successful', async () => { + let mocked = vi.mocked(checkBulkUpload); + for (let i = 1; i < retry; i++) { + mocked = mocked.mockRejectedValueOnce(new Error('Network error')); + } + mocked.mockResolvedValue({ + results: [ + { + action: Action.Accept, + id: testFilePath, + }, + ], + }); + + await expect(checkForDuplicates([testFilePath], { concurrency: 1 })).resolves.toEqual({ + duplicates: [], + newFiles: [testFilePath], + }); + }); + + it('returns results when check duplicates retry is failed', async () => { + vi.mocked(checkBulkUpload).mockRejectedValue(new Error('Network error')); + + await expect(checkForDuplicates([testFilePath], { concurrency: 1 })).resolves.toEqual({ + duplicates: [], + newFiles: [], + }); + }); +}); diff --git a/cli/src/commands/asset.ts b/cli/src/commands/asset.ts index 878ce21e65..9c1a503cda 100644 --- a/cli/src/commands/asset.ts +++ b/cli/src/commands/asset.ts @@ -16,6 +16,7 @@ import { chunk } from 'lodash-es'; import { Stats, createReadStream } from 'node:fs'; import { stat, unlink } from 'node:fs/promises'; import path, { basename } from 'node:path'; +import { Queue } from 'src/queue'; import { BaseOptions, authenticate, crawl, sha1 } from 'src/utils'; const s = (count: number) => (count === 1 ? '' : 's'); @@ -83,7 +84,7 @@ const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => { return files; }; -const checkForDuplicates = async (files: string[], { concurrency, skipHash }: UploadOptionsDto) => { +export const checkForDuplicates = async (files: string[], { concurrency, skipHash }: UploadOptionsDto) => { if (skipHash) { console.log('Skipping hash check, assuming all files are new'); return { newFiles: files, duplicates: [] }; @@ -99,32 +100,50 @@ const checkForDuplicates = async (files: string[], { concurrency, skipHash }: Up const newFiles: string[] = []; const duplicates: Asset[] = []; - try { - // TODO refactor into a queue - for (const items of chunk(files, concurrency)) { - const dto = await Promise.all(items.map(async (filepath) => ({ id: filepath, checksum: await sha1(filepath) }))); - const { results } = await checkBulkUpload({ assetBulkUploadCheckDto: { assets: dto } }); - - for (const { id: filepath, assetId, action } of results as AssetBulkUploadCheckResults) { + const queue = new Queue( + async (filepaths: string[]) => { + const dto = await Promise.all( + filepaths.map(async (filepath) => ({ id: filepath, checksum: await sha1(filepath) })), + ); + const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets: dto } }); + const results = response.results as AssetBulkUploadCheckResults; + for (const { id: filepath, assetId, action } of results) { if (action === Action.Accept) { newFiles.push(filepath); } else { // rejects are always duplicates duplicates.push({ id: assetId as string, filepath }); } - progressBar.increment(); } - } - } finally { - progressBar.stop(); + progressBar.increment(filepaths.length); + return results; + }, + { concurrency, retry: 3 }, + ); + + for (const items of chunk(files, concurrency)) { + await queue.push(items); } + await queue.drained(); + + progressBar.stop(); + console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`); + // Report failures + const failedTasks = queue.tasks.filter((task) => task.status === 'failed'); + if (failedTasks.length > 0) { + console.log(`Failed to verify ${failedTasks.length} file${s(failedTasks.length)}:`); + for (const task of failedTasks) { + console.log(`- ${task.data} - ${task.error}`); + } + } + return { newFiles, duplicates }; }; -const uploadFiles = async (files: string[], { dryRun, concurrency }: UploadOptionsDto): Promise => { +export const uploadFiles = async (files: string[], { dryRun, concurrency }: UploadOptionsDto): Promise => { if (files.length === 0) { console.log('All assets were already uploaded, nothing to do.'); return []; @@ -158,37 +177,52 @@ const uploadFiles = async (files: string[], { dryRun, concurrency }: UploadOptio const newAssets: Asset[] = []; - try { - for (const items of chunk(files, concurrency)) { - await Promise.all( - items.map(async (filepath) => { - const stats = statsMap.get(filepath) as Stats; - const response = await uploadFile(filepath, stats); + const queue = new Queue( + async (filepath: string) => { + const stats = statsMap.get(filepath); + if (!stats) { + throw new Error(`Stats not found for ${filepath}`); + } - newAssets.push({ id: response.id, filepath }); + const response = await uploadFile(filepath, stats); + newAssets.push({ id: response.id, filepath }); + if (response.status === AssetMediaStatus.Duplicate) { + duplicateCount++; + duplicateSize += stats.size ?? 0; + } else { + successCount++; + successSize += stats.size ?? 0; + } - if (response.status === AssetMediaStatus.Duplicate) { - duplicateCount++; - duplicateSize += stats.size ?? 0; - } else { - successCount++; - successSize += stats.size ?? 0; - } + uploadProgress.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) }); - uploadProgress.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) }); + return response; + }, + { concurrency, retry: 3 }, + ); - return response; - }), - ); - } - } finally { - uploadProgress.stop(); + for (const filepath of files) { + await queue.push(filepath); } + await queue.drained(); + + uploadProgress.stop(); + console.log(`Successfully uploaded ${successCount} new asset${s(successCount)} (${byteSize(successSize)})`); if (duplicateCount > 0) { console.log(`Skipped ${duplicateCount} duplicate asset${s(duplicateCount)} (${byteSize(duplicateSize)})`); } + + // Report failures + const failedTasks = queue.tasks.filter((task) => task.status === 'failed'); + if (failedTasks.length > 0) { + console.log(`Failed to upload ${failedTasks.length} asset${s(failedTasks.length)}:`); + for (const task of failedTasks) { + console.log(`- ${task.data} - ${task.error}`); + } + } + return newAssets; }; diff --git a/cli/src/queue.ts b/cli/src/queue.ts new file mode 100644 index 0000000000..c700028a15 --- /dev/null +++ b/cli/src/queue.ts @@ -0,0 +1,131 @@ +import * as fastq from 'fastq'; +import { uniqueId } from 'lodash-es'; + +export type Task = { + readonly id: string; + status: 'idle' | 'processing' | 'succeeded' | 'failed'; + data: T; + error: unknown | undefined; + count: number; + // TODO: Could be useful to adding progress property. + // TODO: Could be useful to adding start_at/end_at/duration properties. + result: undefined | R; +}; + +export type QueueOptions = { + verbose?: boolean; + concurrency?: number; + retry?: number; + // TODO: Could be useful to adding timeout property for retry. +}; + +export type ComputedQueueOptions = Required; + +export const defaultQueueOptions = { + concurrency: 1, + retry: 0, + verbose: false, +}; + +/** + * An in-memory queue that processes tasks in parallel with a given concurrency. + * @see {@link https://www.npmjs.com/package/fastq} + * @template T - The type of the worker task data. + * @template R - The type of the worker output data. + */ +export class Queue { + private readonly queue: fastq.queueAsPromised>; + private readonly store = new Map>(); + readonly options: ComputedQueueOptions; + readonly worker: (data: T) => Promise; + + /** + * Create a new queue. + * @param worker - The worker function that processes the task. + * @param options - The queue options. + */ + constructor(worker: (data: T) => Promise, options?: QueueOptions) { + this.options = { ...defaultQueueOptions, ...options }; + this.worker = worker; + this.store = new Map>(); + this.queue = this.buildQueue(); + } + + get tasks(): Task[] { + const tasks: Task[] = []; + for (const task of this.store.values()) { + tasks.push(task); + } + return tasks; + } + + getTask(id: string): Task { + const task = this.store.get(id); + if (!task) { + throw new Error(`Task with id ${id} not found`); + } + return task; + } + + /** + * Wait for the queue to be empty. + * @returns Promise - The returned Promise will be resolved when all tasks in the queue have been processed by a worker. + * This promise could be ignored as it will not lead to a `unhandledRejection`. + */ + async drained(): Promise { + await this.queue.drain(); + } + + /** + * Add a task at the end of the queue. + * @see {@link https://www.npmjs.com/package/fastq} + * @param data + * @returns Promise - A Promise that will be fulfilled (rejected) when the task is completed successfully (unsuccessfully). + * This promise could be ignored as it will not lead to a `unhandledRejection`. + */ + async push(data: T): Promise> { + const id = uniqueId(); + const task: Task = { id, status: 'idle', error: undefined, count: 0, data, result: undefined }; + this.store.set(id, task); + return this.queue.push(id); + } + + // TODO: Support more function delegation to fastq. + + private buildQueue(): fastq.queueAsPromised> { + return fastq.promise((id: string) => { + const task = this.getTask(id); + return this.work(task); + }, this.options.concurrency); + } + + private async work(task: Task): Promise> { + task.count += 1; + task.error = undefined; + task.status = 'processing'; + if (this.options.verbose) { + console.log('[task] processing:', task); + } + try { + task.result = await this.worker(task.data); + task.status = 'succeeded'; + if (this.options.verbose) { + console.log('[task] succeeded:', task); + } + return task; + } catch (error) { + task.error = error; + task.status = 'failed'; + if (this.options.verbose) { + console.log('[task] failed:', task); + } + if (this.options.retry > 0 && task.count < this.options.retry) { + if (this.options.verbose) { + console.log('[task] retry:', task); + } + return this.work(task); + } + return task; + } + } +}