Commit 1685dac9 authored by nanahira's avatar nanahira

async middleware download support

parent 9f488f78
Pipeline #17453 passed with stages
in 6 minutes and 47 seconds
...@@ -38,7 +38,7 @@ export class Archive extends TimeBase { ...@@ -38,7 +38,7 @@ export class Archive extends TimeBase {
role: ArchiveType; role: ArchiveType;
// should not be a relation // should not be a relation
mirrors: ArchiveMirror[]; mirrors: string[];
get archiveFullPath() { get archiveFullPath() {
return `${this.path}.tar.zst`; return `${this.path}.tar.zst`;
......
...@@ -11,14 +11,11 @@ import { PackageS3Service } from '../package-s3/package-s3.service'; ...@@ -11,14 +11,11 @@ import { PackageS3Service } from '../package-s3/package-s3.service';
import _ from 'lodash'; import _ from 'lodash';
import delay from 'delay'; import delay from 'delay';
export interface MiddlewareInfo {
identifier: string;
maxSize?: number;
}
export interface Middleware { export interface Middleware {
url: string; url: string;
info: MiddlewareInfo; identifier: string;
maxSize?: number;
callback?: string;
} }
export interface UploadInfo { export interface UploadInfo {
...@@ -35,7 +32,7 @@ export interface UploadResult { ...@@ -35,7 +32,7 @@ export interface UploadResult {
@Injectable() @Injectable()
export class MirrorService extends ConsoleLogger { export class MirrorService extends ConsoleLogger {
private middlewares: Middleware[] = []; private middlewares = new Map<string, Middleware>();
private maximumMirroredSize = 0xffffffff; private maximumMirroredSize = 0xffffffff;
constructor( constructor(
@InjectConnection('app') @InjectConnection('app')
...@@ -57,9 +54,9 @@ export class MirrorService extends ConsoleLogger { ...@@ -57,9 +54,9 @@ export class MirrorService extends ConsoleLogger {
for (const url of middlewares) { for (const url of middlewares) {
try { try {
this.log(`Loading middleware ${url}`); this.log(`Loading middleware ${url}`);
const { data } = await lastValueFrom(this.http.get<ReturnMessage<MiddlewareInfo>>(url, { responseType: 'json' })); const { data } = await lastValueFrom(this.http.get<ReturnMessage<Middleware>>(url, { responseType: 'json' }));
const middleware: Middleware = { url, info: data.data }; const middleware: Middleware = { ...data.data, url };
this.middlewares.push(middleware); this.middlewares.set(data.data.identifier, middleware);
if (data.data.maxSize) { if (data.data.maxSize) {
this.maximumMirroredSize = Math.min(this.maximumMirroredSize, data.data.maxSize); this.maximumMirroredSize = Math.min(this.maximumMirroredSize, data.data.maxSize);
} }
...@@ -68,7 +65,7 @@ export class MirrorService extends ConsoleLogger { ...@@ -68,7 +65,7 @@ export class MirrorService extends ConsoleLogger {
this.log(`Loading middleware ${url} failed: ${e.toString()}`); this.log(`Loading middleware ${url} failed: ${e.toString()}`);
} }
} }
if (this.middlewares.length) { if (this.middlewares.size) {
this.mainLoop().then(); this.mainLoop().then();
} }
} }
...@@ -83,14 +80,14 @@ export class MirrorService extends ConsoleLogger { ...@@ -83,14 +80,14 @@ export class MirrorService extends ConsoleLogger {
}; };
return _.compact( return _.compact(
await Promise.all( await Promise.all(
this.middlewares.map(async (middleware) => { Array.from(this.middlewares.values()).map(async (middleware) => {
const result = await this.uploadWithMiddleware(uploadInfo, middleware); const result = await this.uploadWithMiddleware(uploadInfo, middleware);
if (!result) { if (!result) {
return null; return null;
} }
const mirrorEnt = new ArchiveMirror(); const mirrorEnt = new ArchiveMirror();
mirrorEnt.path = archive.path; mirrorEnt.path = archive.path;
mirrorEnt.middleware = middleware.info.identifier; mirrorEnt.middleware = middleware.identifier;
mirrorEnt.url = result.url; mirrorEnt.url = result.url;
return mirrorEnt; return mirrorEnt;
}) })
...@@ -156,7 +153,7 @@ export class MirrorService extends ConsoleLogger { ...@@ -156,7 +153,7 @@ export class MirrorService extends ConsoleLogger {
private async uploadWithMiddleware(uploadInfo: UploadInfo, middleware: Middleware): Promise<UploadResult> { private async uploadWithMiddleware(uploadInfo: UploadInfo, middleware: Middleware): Promise<UploadResult> {
try { try {
this.log(`Uploading ${uploadInfo.url} with middleware ${middleware.info.identifier}.`); this.log(`Uploading ${uploadInfo.url} with middleware ${middleware.identifier}.`);
const { data } = await lastValueFrom( const { data } = await lastValueFrom(
this.http.post<ReturnMessage<string>>(middleware.url, uploadInfo, { responseType: 'json', timeout: 60 * 60 * 1000 }) this.http.post<ReturnMessage<string>>(middleware.url, uploadInfo, { responseType: 'json', timeout: 60 * 60 * 1000 })
); );
...@@ -166,12 +163,12 @@ export class MirrorService extends ConsoleLogger { ...@@ -166,12 +163,12 @@ export class MirrorService extends ConsoleLogger {
middleware, middleware,
}; };
} catch (e) { } catch (e) {
this.error(`Failed uploading ${uploadInfo.url} with middleware ${middleware.info.identifier}: ${e.toString()} ${e.data}`); this.error(`Failed uploading ${uploadInfo.url} with middleware ${middleware.identifier}: ${e.toString()} ${e.data}`);
return null; return null;
} }
} }
async uploadWithRandomMiddleware(uploadInfo: UploadInfo, middlewares = this.middlewares) { async uploadWithRandomMiddleware(uploadInfo: UploadInfo, middlewares = Array.from(this.middlewares.values())) {
if (!middlewares.length) { if (!middlewares.length) {
return null; return null;
} }
...@@ -182,17 +179,49 @@ export class MirrorService extends ConsoleLogger { ...@@ -182,17 +179,49 @@ export class MirrorService extends ConsoleLogger {
} }
return this.uploadWithRandomMiddleware( return this.uploadWithRandomMiddleware(
uploadInfo, uploadInfo,
middlewares.filter((m) => m.info.identifier !== middleware.info.identifier) middlewares.filter((m) => m.identifier !== middleware.identifier)
); );
} }
async getMirrorUrl(mirror: ArchiveMirror): Promise<string> {
const middleware = this.middlewares.get(mirror.middleware);
if (!middleware) {
return;
}
if (!middleware.callback) {
return mirror.url;
}
try {
const { data } = await lastValueFrom(
this.http.patch<{ url: string }>(middleware.url, mirror, { responseType: 'json', timeout: 30000 })
);
return data.url;
} catch (e) {
this.error(`Failed getting mirror url for ${mirror.path} with middleware ${middleware.identifier}: ${e.toString()}`);
return;
}
}
async lookForArchiveMirror(archive: Archive) { async lookForArchiveMirror(archive: Archive) {
if (archive.size > this.maximumMirroredSize) { if (archive.size > this.maximumMirroredSize) {
return; return;
} }
const mirrors = await this.db.getRepository(ArchiveMirror).find({ where: { path: archive.path, disabled: false }, select: ['url'] }); const mirrors = await this.db
.getRepository(ArchiveMirror)
.find({ where: { path: archive.path, disabled: false }, select: ['url', 'middleware', 'path'] });
if (mirrors.length) { if (mirrors.length) {
archive.mirrors = mirrors; const urls: string[] = [];
await Promise.all(
mirrors.map(async (m) => {
const url = await this.getMirrorUrl(m);
if (url) {
urls.push(url);
}
})
);
if (urls.length) {
archive.mirrors = urls;
}
} }
return archive; return archive;
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
<hash type="sha-256">{{.}}</hash> <hash type="sha-256">{{.}}</hash>
{{/hash}} {{/hash}}
{{#mirrors}} {{#mirrors}}
<url priority="1">{{&url}}</url> <url priority="1">{{&.}}</url>
{{/mirrors}} {{/mirrors}}
<url priority="2">{{&cdnUrl}}/{{path}}.tar.zst</url> <url priority="2">{{&cdnUrl}}/{{path}}.tar.zst</url>
</file> </file>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment