Commit 1e72fbf4 authored by nanahira's avatar nanahira

fix hash

parent 7510a464
Pipeline #15308 passed with stages
in 3 minutes and 22 seconds
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.26.0", "@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0", "@aws-sdk/lib-storage": "^3.26.0",
"@cityssm/map-expire": "^1.1.1",
"@nanahira/redlock": "^1.0.0", "@nanahira/redlock": "^1.0.0",
"@nestjs-modules/ioredis": "^1.0.1", "@nestjs-modules/ioredis": "^1.0.1",
"@nestjs/axios": "^0.0.1", "@nestjs/axios": "^0.0.1",
...@@ -1926,11 +1925,6 @@ ...@@ -1926,11 +1925,6 @@
"integrity": "sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==", "integrity": "sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==",
"dev": true "dev": true
}, },
"node_modules/@cityssm/map-expire": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@cityssm/map-expire/-/map-expire-1.1.1.tgz",
"integrity": "sha512-eDOZR3XKkzu1fzUtr96LZFrqhbIVwrW3tTbbQB/C2F1Mj7eVdH5HFj93WyrWSV/kQJpj1orQUwrl/A2F8nUGRQ=="
},
"node_modules/@cspotcode/source-map-consumer": { "node_modules/@cspotcode/source-map-consumer": {
"version": "0.8.0", "version": "0.8.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
...@@ -13027,11 +13021,6 @@ ...@@ -13027,11 +13021,6 @@
"integrity": "sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==", "integrity": "sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==",
"dev": true "dev": true
}, },
"@cityssm/map-expire": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@cityssm/map-expire/-/map-expire-1.1.1.tgz",
"integrity": "sha512-eDOZR3XKkzu1fzUtr96LZFrqhbIVwrW3tTbbQB/C2F1Mj7eVdH5HFj93WyrWSV/kQJpj1orQUwrl/A2F8nUGRQ=="
},
"@cspotcode/source-map-consumer": { "@cspotcode/source-map-consumer": {
"version": "0.8.0", "version": "0.8.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.26.0", "@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0", "@aws-sdk/lib-storage": "^3.26.0",
"@cityssm/map-expire": "^1.1.1",
"@nanahira/redlock": "^1.0.0", "@nanahira/redlock": "^1.0.0",
"@nestjs-modules/ioredis": "^1.0.1", "@nestjs-modules/ioredis": "^1.0.1",
"@nestjs/axios": "^0.0.1", "@nestjs/axios": "^0.0.1",
......
...@@ -33,8 +33,8 @@ export class AdminController { ...@@ -33,8 +33,8 @@ export class AdminController {
return this.appService.migrateFromAppsJson(apps); return this.appService.migrateFromAppsJson(apps);
} }
@Post('migrate/hash') @Post('checkhash')
@ApiOperation({ summary: '迁移 hash' }) @ApiOperation({ summary: '检查 hash' })
@ApiCreatedResponse({ type: BlankReturnMessageDto }) @ApiCreatedResponse({ type: BlankReturnMessageDto })
async migrateHash(): Promise<BlankReturnMessageDto> { async migrateHash(): Promise<BlankReturnMessageDto> {
return this.appService.migrateHashes(); return this.appService.migrateHashes();
......
...@@ -18,6 +18,9 @@ import { HttpService } from '@nestjs/axios'; ...@@ -18,6 +18,9 @@ import { HttpService } from '@nestjs/axios';
import { lastValueFrom } from 'rxjs'; import { lastValueFrom } from 'rxjs';
import { MirrorService } from './mirror/mirror.service'; import { MirrorService } from './mirror/mirror.service';
import { AssetsS3Service } from './assets-s3/assets-s3.service'; import { AssetsS3Service } from './assets-s3/assets-s3.service';
import { Redis } from 'ioredis';
import { InjectRedis } from '@nestjs-modules/ioredis';
import _ from 'lodash';
@Injectable() @Injectable()
export class AppService extends ConsoleLogger { export class AppService extends ConsoleLogger {
...@@ -28,6 +31,7 @@ export class AppService extends ConsoleLogger { ...@@ -28,6 +31,7 @@ export class AppService extends ConsoleLogger {
appService: AppService, appService: AppService,
@InjectConnection('app') @InjectConnection('app')
private db: Connection, private db: Connection,
@InjectRedis() private readonly redis: Redis,
private packager: PackagerService, private packager: PackagerService,
private packageS3: PackageS3Service, private packageS3: PackageS3Service,
private assetsS3: AssetsS3Service, private assetsS3: AssetsS3Service,
...@@ -342,7 +346,7 @@ export class AppService extends ConsoleLogger { ...@@ -342,7 +346,7 @@ export class AppService extends ConsoleLogger {
return this.updateResult(() => this.db.getRepository(App).update({ id }, { isDeleted: true })); return this.updateResult(() => this.db.getRepository(App).update({ id }, { isDeleted: true }));
} }
async lookForExistingArchiveHash(path: string) { private async lookForHashFromDatabase(path: string) {
const [archive] = await this.db const [archive] = await this.db
.getRepository(Archive) .getRepository(Archive)
.find({ where: { path, hash: Not(IsNull()) }, select: ['hash'], order: { id: 'DESC' }, take: 1 }); .find({ where: { path, hash: Not(IsNull()) }, select: ['hash'], order: { id: 'DESC' }, take: 1 });
...@@ -353,6 +357,20 @@ export class AppService extends ConsoleLogger { ...@@ -353,6 +357,20 @@ export class AppService extends ConsoleLogger {
// return this.checkHashFromUrl(`${this.packageS3.cdnUrl}/${path}.tar.zst`); // return this.checkHashFromUrl(`${this.packageS3.cdnUrl}/${path}.tar.zst`);
} }
async lookForExistingArchiveHash(path: string) {
const hashKey = `hash:${path}`;
let hash = await this.redis.get(hashKey);
if (hash) {
return hash;
}
hash = await this.lookForHashFromDatabase(path);
if (hash) {
await this.redis.set(hashKey, hash, 'EX', 60 * 60 * 24);
return hash;
}
return null;
}
async checkHashFromUrl(url: string) { async checkHashFromUrl(url: string) {
this.log(`Downloading ${url} for checking hash.`); this.log(`Downloading ${url} for checking hash.`);
const { data } = await lastValueFrom(this.http.get<internal.Readable>(url, { responseType: 'stream' })); const { data } = await lastValueFrom(this.http.get<internal.Readable>(url, { responseType: 'stream' }));
...@@ -364,25 +382,47 @@ export class AppService extends ConsoleLogger { ...@@ -364,25 +382,47 @@ export class AppService extends ConsoleLogger {
}); });
} }
async getHashForMigrate(fullPath: string): Promise<string> { async getHashForMigrate(archive: Archive): Promise<string> {
const url = this.packageS3.getCdnUrl(fullPath); const existingHash = await this.lookForExistingArchiveHash(archive.path);
this.log(`Migrating hash: ${url} `); if (existingHash) {
const hash = await this.checkHashFromUrl(url); return existingHash;
this.log(`Migrated hash: ${url} => ${hash}`); }
return hash; const url = this.packageS3.getCdnUrl(archive.archiveFullPath);
try {
this.log(`Migrating hash: ${url} `);
const hash = await this.checkHashFromUrl(url);
this.log(`Migrated hash: ${url} => ${hash}`);
return hash;
} catch (e) {
this.error(`Failed to migrate hash: ${url} => ${e.message}`);
return null;
}
} }
async migrateHashes() { async migrateHashes() {
const archivesToDo = await this.db.getRepository(Archive).find({ where: { hash: IsNull() }, select: ['id', 'hash', 'path'] }); const archivesToDo = await this.db.getRepository(Archive).find({ where: { hash: IsNull() }, select: ['id', 'hash', 'path'] });
const tmpMap = new Map<string, string>(); const tmpMap = new Map<string, string>();
const archivesToSave: Archive[] = [];
for (const archive of archivesToDo) { for (const archive of archivesToDo) {
if (tmpMap.has(archive.path)) { if (tmpMap.has(archive.path)) {
archive.hash = tmpMap.get(archive.path); archive.hash = tmpMap.get(archive.path);
} else { } else {
archive.hash = await this.getHashForMigrate(archive.archiveFullPath); const hash = await this.getHashForMigrate(archive);
tmpMap.set(archive.path, archive.hash); if (hash) {
archive.hash = await this.getHashForMigrate(archive);
tmpMap.set(archive.path, archive.hash);
archivesToSave.push(archive);
}
} }
} }
await this.db.getRepository(Archive).save(archivesToDo); await this.db.transaction(async (edb) => {
const repo = edb.getRepository(Archive);
const chunks = _.chunk(archivesToSave, 10000);
for (const chunk of chunks) {
await repo.save(chunk);
}
});
await this.db.getRepository(Archive).save(archivesToSave);
return new BlankReturnMessageDto(200, 'success'); return new BlankReturnMessageDto(200, 'success');
} }
......
...@@ -14,7 +14,6 @@ import { ConsoleLogger, forwardRef, Inject, Injectable } from '@nestjs/common'; ...@@ -14,7 +14,6 @@ import { ConsoleLogger, forwardRef, Inject, Injectable } from '@nestjs/common';
import { Archive, ArchiveType } from '../entities/Archive.entity'; import { Archive, ArchiveType } from '../entities/Archive.entity';
import { AppService } from '../app.service'; import { AppService } from '../app.service';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import { Cache } from '@cityssm/map-expire';
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import { LockService } from 'src/lock/lock.service'; import { LockService } from 'src/lock/lock.service';
import { InjectRedis, Redis } from '@nestjs-modules/ioredis'; import { InjectRedis, Redis } from '@nestjs-modules/ioredis';
...@@ -197,20 +196,6 @@ export class PackagerService extends ConsoleLogger { ...@@ -197,20 +196,6 @@ export class PackagerService extends ConsoleLogger {
]); ]);
} }
private async lookForExistingArchiveHash(path: string) {
const hashKey = `hash:${path}`;
let hash = await this.redis.get(hashKey);
if (hash) {
return hash;
}
hash = await this.appService.lookForExistingArchiveHash(path);
if (hash) {
await this.redis.set(hashKey, hash, 'EX', 60 * 60 * 24);
return hash;
}
return null;
}
async archive(root: string, archiveTask: ArchiveTask): Promise<Archive> { async archive(root: string, archiveTask: ArchiveTask): Promise<Archive> {
return this.redlock.using([`archive:${archiveTask.path}`], 5000, async () => return this.redlock.using([`archive:${archiveTask.path}`], 5000, async () =>
this.archiveQueue.add(() => this.archiveProcess(root, archiveTask)) this.archiveQueue.add(() => this.archiveProcess(root, archiveTask))
...@@ -224,8 +209,9 @@ export class PackagerService extends ConsoleLogger { ...@@ -224,8 +209,9 @@ export class PackagerService extends ConsoleLogger {
const archiveName = archiveTask.archiveFullPath; const archiveName = archiveTask.archiveFullPath;
const existing = await this.s3.fileExists(archiveName); const existing = await this.s3.fileExists(archiveName);
if (existing) { if (existing) {
const hash = await this.lookForExistingArchiveHash(archiveTask.path); const hash = await this.appService.lookForExistingArchiveHash(archiveTask.path);
if (hash) { if (hash) {
archive.hash = hash;
archive.size = existing.Size; archive.size = existing.Size;
this.log(`Archive ${archiveName} exists, skipping.`); this.log(`Archive ${archiveName} exists, skipping.`);
return archive; return archive;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment