Commit 075d0d39 authored by nanahira's avatar nanahira

move to redis

parent d8bb19f4
Pipeline #14454 failed with stages
in 2 minutes and 52 seconds
This diff is collapsed.
......@@ -24,6 +24,7 @@
"@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0",
"@cityssm/map-expire": "^1.1.1",
"@nestjs-modules/ioredis": "^1.0.1",
"@nestjs/axios": "^0.0.1",
"@nestjs/cli": "^8.0.0",
"@nestjs/common": "^8.0.0",
......@@ -37,7 +38,7 @@
"busboy": "^0.2.14",
"class-transformer": "^0.4.0",
"class-validator": "^0.13.1",
"delay": "^5.0.0",
"ioredis": "^4.28.5",
"lodash": "^4.17.21",
"moment": "^2.29.1",
"mustache": "^4.2.0",
......@@ -45,6 +46,7 @@
"p-queue": "6.6.2",
"pg": "^8.7.1",
"readdirp": "^3.6.0",
"redlock": "^5.0.0-beta.2",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.2.0",
......@@ -59,6 +61,7 @@
"@nestjs/testing": "^8.0.0",
"@types/busboy": "^0.2.4",
"@types/express": "^4.17.13",
"@types/ioredis": "^4.28.10",
"@types/jest": "^26.0.24",
"@types/lodash": "^4.14.172",
"@types/multer": "^1.4.7",
......
......@@ -19,19 +19,20 @@ import path from 'path';
import { ArchiveMirror } from './entities/ArchiveMirror.dto';
import { MirrorService } from './mirror/mirror.service';
import { HttpModule } from '@nestjs/axios';
const configModule = ConfigModule.forRoot();
import { RedisModule } from '@nestjs-modules/ioredis';
import { LockService } from './lock/lock.service';
@Module({
imports: [
ServeStaticModule.forRoot({
rootPath: path.join(__dirname, '..', 'public'),
}),
configModule,
ConfigModule.forRoot({
isGlobal: true
}),
HttpModule,
TypeOrmModule.forRootAsync({
name: 'app',
imports: [configModule],
inject: [ConfigService],
useFactory: async (config: ConfigService) => {
return {
......@@ -49,8 +50,16 @@ const configModule = ConfigModule.forRoot();
};
},
}),
RedisModule.forRootAsync({
inject: [ConfigService],
useFactory: async (config: ConfigService) => ({
config: {
url: config.get('REDIS_URL'),
},
}),
}),
],
controllers: [AppController, AdminController, UpdateController],
providers: [AppService, PackagerService, AssetsS3Service, PackageS3Service, UpdateService, MirrorService],
providers: [AppService, PackagerService, AssetsS3Service, PackageS3Service, UpdateService, MirrorService, LockService],
})
export class AppModule {}
import { Test, TestingModule } from '@nestjs/testing';
import { LockService } from './lock.service';
describe('LockService', () => {
let service: LockService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [LockService],
}).compile();
service = module.get<LockService>(LockService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { InjectRedis, Redis } from '@nestjs-modules/ioredis';
import { Injectable } from '@nestjs/common';
import Redlock from 'redlock';
@Injectable()
export class LockService extends Redlock {
constructor(@InjectRedis() private readonly redis: Redis) {
super([redis]);
}
}
......@@ -14,9 +14,10 @@ import { ConsoleLogger, forwardRef, Inject, Injectable } from '@nestjs/common';
import { Archive, ArchiveType } from '../entities/Archive.entity';
import { AppService } from '../app.service';
import { createHash } from 'crypto';
import delay from 'delay';
import { Cache } from '@cityssm/map-expire';
import PQueue from 'p-queue';
import { LockService } from 'src/lock/lock.service';
import { InjectRedis, Redis } from '@nestjs-modules/ioredis';
export interface FileWithHash {
file: readdirp.EntryInfo;
......@@ -63,17 +64,19 @@ export class ArchiveTask {
@Injectable()
export class PackagerService extends ConsoleLogger {
bucket_max = 10 * 1024 ** 2;
bucket_enter = 1 * 1024 ** 2;
bucket_enter = 1024 ** 2;
noGatherExts = new Set<string>();
packagerWorkingDirectory: string;
private uploadLock = new Set<string>();
private hashCache = new Cache<string, string>();
// private uploadLock = new Set<string>();
// private hashCache = new Cache<string, string>();
constructor(
@Inject(forwardRef(() => AppService)) private readonly appService: AppService,
private s3: PackageS3Service,
config: ConfigService
config: ConfigService,
private redlock: LockService,
@InjectRedis() private readonly redis: Redis
) {
super('packager');
this.bucket_max = (parseInt(config.get('PACKAGE_BUCKET_MAX')) || 10) * 1024 ** 2;
......@@ -85,19 +88,6 @@ export class PackagerService extends ConsoleLogger {
}
}
private async waitForLock(key: string) {
while (this.uploadLock.has(key)) {
await delay(10);
}
this.uploadLock.add(key);
}
private releaseLock(key: string) {
if (this.uploadLock.has(key)) {
this.uploadLock.delete(key);
}
}
async build(
stream: NodeJS.ReadableStream,
pathPrefix?: string,
......@@ -208,21 +198,23 @@ export class PackagerService extends ConsoleLogger {
}
private async lookForExistingArchiveHash(path: string) {
let hash = this.hashCache.get(path);
const hashKey = `hash:${path}`;
let hash = await this.redis.get(hashKey);
if (hash) {
return hash;
}
hash = await this.appService.lookForExistingArchiveHash(path);
if (hash) {
this.hashCache.set(path, hash, 24 * 60 * 60 * 1000);
await this.redis.set(hashKey, hash, 'EX', 60 * 60 * 24);
return hash;
}
return null;
}
async archive(root: string, archiveTask: ArchiveTask): Promise<Archive> {
await this.waitForLock(archiveTask.path);
return this.archiveQueue.add(() => this.archiveProcess(root, archiveTask));
return this.redlock.using([`archive:${archiveTask.path}`], 1800 * 1000, async () =>
this.archiveQueue.add(() => this.archiveProcess(root, archiveTask))
);
}
private archiveQueue = new PQueue({ concurrency: parseInt(process.env.PACKAGE_COCURRENCY) || os.cpus().length });
......@@ -230,55 +222,50 @@ export class PackagerService extends ConsoleLogger {
private async archiveProcess(root: string, archiveTask: ArchiveTask): Promise<Archive> {
const archive = archiveTask.archive;
const archiveName = archiveTask.archiveFullPath;
try {
const existing = await this.s3.fileExists(archiveName);
if (existing) {
const hash = await this.lookForExistingArchiveHash(archiveTask.path);
if (hash) {
archive.size = existing.Size;
this.log(`Archive ${archiveName} exists, skipping.`);
return archive;
}
const existing = await this.s3.fileExists(archiveName);
if (existing) {
const hash = await this.lookForExistingArchiveHash(archiveTask.path);
if (hash) {
archive.size = existing.Size;
this.log(`Archive ${archiveName} exists, skipping.`);
return archive;
}
const files = archiveTask.filePaths;
this.log(`Packaging archive ${archiveName} with ${archiveTask.exactFilePaths.length} files.`);
}
const files = archiveTask.filePaths;
this.log(`Packaging archive ${archiveName} with ${archiveTask.exactFilePaths.length} files.`);
const child = child_process.spawn('tar', ['--zstd', '-cf', '-'].concat(files), {
cwd: root,
});
const childPromise = new Promise<void>((resolve, reject) => {
child.on('exit', (code) => {
if (code == 0) {
resolve();
} else {
reject(code);
}
});
child.on('error', (error) => {
reject(error);
});
const child = child_process.spawn('tar', ['--zstd', '-cf', '-'].concat(files), {
cwd: root,
});
const childPromise = new Promise<void>((resolve, reject) => {
child.on('exit', (code) => {
if (code == 0) {
resolve();
} else {
reject(code);
}
});
const hashObject = createHash('sha256');
child.stdout.on('data', (chunk) => {
hashObject.update(chunk);
child.on('error', (error) => {
reject(error);
});
/* if (existing) {
});
const hashObject = createHash('sha256');
child.stdout.on('data', (chunk) => {
hashObject.update(chunk);
});
/* if (existing) {
await childPromise;
archive.hash = hashObject.digest('hex');
return archive;
}*/
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, {
ContentType: 'application/tar+zstd',
});
const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.hash = hashObject.digest('hex');
this.hashCache.set(archive.path, archive.hash, 24 * 60 * 60 * 1000);
archive.size = object.Size;
} catch (e) {
throw e;
} finally {
this.releaseLock(archiveTask.path);
}
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, {
ContentType: 'application/tar+zstd',
});
const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.hash = hashObject.digest('hex');
await this.redis.set(`hash:${archive.path}`, archive.hash, 'EX', 60 * 60 * 24);
archive.size = object.Size;
return archive;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment