Commit 05e16f82 authored by nanahira's avatar nanahira

mirror loop

parent 8233ff30
Pipeline #5429 canceled with stages
......@@ -8,12 +8,13 @@ import { AppService } from '../app.service';
import { BlankReturnMessageDto, ReturnMessageDto } from '../dto/ReturnMessage.dto';
import { AssignAppDto } from '../dto/AssignApp.dto';
import { AppPrefixDto } from '../dto/AppPrefix.dto';
import { MirrorService } from '../mirror/mirror.service';
@Controller('api/admin')
@ApiTags('admin')
@UseGuards(MyCardAdminGuard)
export class AdminController {
constructor(private readonly appService: AppService) {}
constructor(private readonly appService: AppService, private readonly mirrorService: MirrorService) {}
@Post('migrate/apps.json')
@ApiOperation({ summary: '迁移 apps', description: '上传一个 apps.json 文件' })
......@@ -81,4 +82,12 @@ export class AdminController {
async purgeOldArchives() {
return new ReturnMessageDto(201, 'success', await this.appService.purgeOldArchives());
}
@Post('mirror')
@ApiOperation({ summary: '触发镜像' })
@ApiOkResponse({ type: BlankReturnMessageDto })
async triggerMirror() {
this.mirrorService.triggerMirror();
return new ReturnMessageDto(201, 'success');
}
}
......@@ -209,6 +209,7 @@ export class AppService extends ConsoleLogger {
build.checksum = result.checksum;
build.archives = result.archives;
build = await this.db.getRepository(Build).save(build);
this.mirror.triggerMirror();
/*
await this.db.transaction(async (edb) => {
this.log(`Saving build info.`);
......
......@@ -7,6 +7,9 @@ import { lastValueFrom } from 'rxjs';
import { ReturnMessage } from '../dto/ReturnMessage.dto';
import { Archive } from '../entities/Archive.entity';
import { ArchiveMirror } from '../entities/ArchiveMirror.dto';
import { PackageS3Service } from '../package-s3/package-s3.service';
import _ from 'lodash';
import delay from 'delay';
export interface MiddlewareInfo {
identifier: string;
......@@ -38,7 +41,8 @@ export class MirrorService extends ConsoleLogger {
@InjectConnection('app')
private db: Connection,
private config: ConfigService,
private http: HttpService
private http: HttpService,
private packageS3: PackageS3Service
) {
super('mirror');
this.init().then();
......@@ -66,6 +70,85 @@ export class MirrorService extends ConsoleLogger {
this.log(`Loading middleware ${url} failed: ${e.toString()}`);
}
}
if (this.middlewares.length) {
this.mainLoop().then();
}
}
private async saveMirrorFromPath(archive: Partial<Archive>) {
const uploadInfo: UploadInfo = {
url: this.packageS3.getCdnUrl(`${archive.path}.tar.zst`),
size: archive.size,
};
return _.compact(
await Promise.all(
this.middlewares.map(async (middleware) => {
const result = await this.uploadWithMiddleware(uploadInfo, middleware);
if (!result) {
return null;
}
const mirrorEnt = new ArchiveMirror();
mirrorEnt.path = archive.path;
mirrorEnt.middleware = middleware.info.identifier;
mirrorEnt.url = result.url;
return mirrorEnt;
})
)
);
}
private needRun = true;
triggerMirror() {
this.needRun = true;
}
private async mainLoop() {
while (true) {
if (!this.needRun) {
await delay(5000);
continue;
}
try {
this.log(`Started running main loop.`);
this.needRun = await this.runMirror();
this.log(`Finished running main loop.`);
} catch (e) {
this.error(`Main loop failed: ${e.toString()}`);
await delay(1000);
this.needRun = true;
}
}
}
private async runMirror() {
const query = this.db
.createQueryBuilder()
.select('archive.path', 'path')
.distinctOn(['archive.path'])
.addSelect('archive.size', 'size')
.from(Archive, 'archive')
.where('archive.size <= :maximumMirroredSize', { maximumMirroredSize: this.maximumMirroredSize });
query
.andWhere(
`not exists ${query.subQuery().select('mirror.path').from(ArchiveMirror, 'mirror').where('archive.path = mirror.path').getQuery()}`
)
.take(10);
this.log(`Searching for archives to mirror`);
const archives = await query.getRawMany<Partial<Archive>>();
if (!archives.length) {
return false;
}
this.log(`Uploading ${archives.length} archives.`);
const uploadResults = _.flatten(await Promise.all(archives.map((a) => this.saveMirrorFromPath(a))));
this.log(`Saving ${uploadResults.length} mirror records.`);
await this.db.transaction(async (edb) => {
const entTrunks = _.chunk(uploadResults, 20000);
for (const chunk of entTrunks) {
await edb.getRepository(ArchiveMirror).save(chunk);
}
});
return true;
}
private async uploadWithMiddleware(uploadInfo: UploadInfo, middleware: Middleware): Promise<UploadResult> {
......@@ -81,6 +164,7 @@ export class MirrorService extends ConsoleLogger {
};
} catch (e) {
this.error(`Failed uploading ${uploadInfo.url} with middleware ${middleware.info.identifier}: ${e.toString()} ${e.data}`);
return null;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment