Commit 515a1d19 authored by 神楽坂玲奈's avatar 神楽坂玲奈

packager

parent 7c41426b
{
"singleQuote": true,
"trailingComma": "all"
}
\ No newline at end of file
"printWidth": 140
}
......@@ -23,7 +23,7 @@
"mustache": "^4.2.0",
"p-queue": "6.6.2",
"pg": "^8.7.1",
"pg-native": "*",
"readdirp": "^3.6.0",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.2.0",
......@@ -8972,7 +8972,6 @@
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.0.tgz",
"integrity": "sha512-lY1Q/PiJGC2zOv/z391WOTD+Z02bCgsFfvxoXXf6h7kv9o+WmsmzYqrAwY63sNgOxE4xEdq0WyUnXfKeBrSvYw==",
"dev": true,
"engines": {
"node": ">=8.6"
},
......@@ -9281,7 +9280,6 @@
"version": "3.6.0",
"resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz",
"integrity": "sha512-hOS089on8RduqdbhvQ5Z37A0ESjsqz6qnRcffsMU3495FuTdqSm+7bhJ29JvIOsBDEEnan5DPu9t3To9VRlMzA==",
"dev": true,
"dependencies": {
"picomatch": "^2.2.1"
},
......@@ -18575,8 +18573,7 @@
"picomatch": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.0.tgz",
"integrity": "sha512-lY1Q/PiJGC2zOv/z391WOTD+Z02bCgsFfvxoXXf6h7kv9o+WmsmzYqrAwY63sNgOxE4xEdq0WyUnXfKeBrSvYw==",
"dev": true
"integrity": "sha512-lY1Q/PiJGC2zOv/z391WOTD+Z02bCgsFfvxoXXf6h7kv9o+WmsmzYqrAwY63sNgOxE4xEdq0WyUnXfKeBrSvYw=="
},
"pirates": {
"version": "4.0.1",
......@@ -18801,7 +18798,6 @@
"version": "3.6.0",
"resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz",
"integrity": "sha512-hOS089on8RduqdbhvQ5Z37A0ESjsqz6qnRcffsMU3495FuTdqSm+7bhJ29JvIOsBDEEnan5DPu9t3To9VRlMzA==",
"dev": true,
"requires": {
"picomatch": "^2.2.1"
}
......
......@@ -36,6 +36,7 @@
"mustache": "^4.2.0",
"p-queue": "6.6.2",
"pg": "^8.7.1",
"readdirp": "^3.6.0",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.2.0",
......
import { ConsoleLogger, Injectable } from '@nestjs/common';
import * as fs from 'fs';
import * as path from 'path';
import * as crypto from 'crypto';
import * as child_process from 'child_process';
import * as Mustache from 'mustache';
import fs from 'fs';
import path from 'path';
import child_process from 'child_process';
import tar from 'tar';
import { App } from '../entities/App.entity';
import PQueue from 'p-queue';
// eslint-disable-next-line @typescript-eslint/no-empty-function
function nothing() {}
import os from 'os';
import { S3Service } from '../s3/s3.service';
import { PutObjectCommand } from '@aws-sdk/client-s3';
import util from 'util';
import { v4 as uuidv4 } from 'uuid';
import readdirp from 'readdirp';
import _ from 'lodash';
@Injectable()
export class PackagerService extends ConsoleLogger {
workingPath: string;
releasePath: string;
downloadBaseUrl: string;
queueIdMap = new Map<string, PQueue>();
// workingPath: string;
// releasePath: string;
// downloadBaseUrl: string;
// queueIdMap = new Map<string, PQueue>();
getQueue(id: string) {
if (!this.queueIdMap.has(id)) {
this.queueIdMap.set(id, new PQueue({ concurrency: 1 }));
}
return this.queueIdMap.get(id);
}
constructor() {
constructor(private s3: S3Service) {
super('packager');
}
private async readdirRecursive(_path: string): Promise<string[]> {
const files = await fs.promises.readdir(_path, { encoding: 'utf-8' });
let result = files;
async build(stream: fs.ReadStream): Promise<BuildResult> {
const bucket_max = 10 * 1024 ** 2;
const bucket_enter = 1 * 1024 ** 2;
const root = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'mycard-console-'));
await this.spawnAsync('tar', ['-zxvf', '-'], { cwd: root, stdio: [stream, 'inherit', 'inherit'] });
const buckets: Record<string, [string[], number]> = {};
const packages: Record<string, string[]> = {};
const entries = await readdirp.promise(root, { alwaysStat: true, type: 'files_directories' });
const [directories, files] = _.partition(entries, (item) => item.stats.isDirectory());
// checksum
const c = this.checksum(
root,
directories.map((d) => d.path),
files.map((f) => f.path)
);
const promises = [c];
// 整包
const archive = `${uuidv4()}.tar.gz`;
packages[archive] = [];
promises.push(this.archive(archive, root, await fs.promises.readdir(root)));
// 散包
for (const file of files) {
const child = path.join(_path, file);
const stat = await fs.promises.stat(child);
if (stat.isDirectory()) {
result = result.concat(
(await this.readdirRecursive(child)).map((_file) =>
path.join(file, _file),
),
);
if (file.stats.size < bucket_enter) {
const extname = path.extname(file.basename);
buckets[extname] ??= [[], 0];
const bucket = buckets[extname];
if (bucket[1] + file.stats.size >= bucket_max) {
const archive = `${uuidv4()}.tar.gz`;
packages[archive] = bucket[0];
promises.push(this.archive(archive, root, bucket[0]));
bucket[0] = [];
bucket[1] = 0;
} else {
bucket[0].push(file.path);
bucket[1] += file.stats.size;
}
} else {
const archive = `${uuidv4()}.tar.gz`;
packages[archive] = [file.path];
promises.push(this.archive(archive, root, [file.path]));
}
}
for (const bucket of Object.values(buckets)) {
if (bucket[0].length) {
const archive = `${uuidv4()}.tar.gz`;
packages[archive] = bucket[0];
promises.push(this.archive(archive, root, bucket[0]));
}
}
return result;
// TODO: 更新包
const [checksum] = await Promise.all(promises); // 这个 await 过后,checksum 和 打包上传都已经跑完了
console.log(checksum, packages);
return { checksum, packages };
}
private caculateSHA256(file: string): Promise<string> {
return new Promise((resolve, reject) => {
const input = fs.createReadStream(file);
const hash = crypto.createHash('sha256');
hash.on('error', (error: Error) => {
reject(error);
});
input.on('error', (error: Error) => {
reject(error);
});
hash.on('readable', () => {
const data = hash.read();
if (data) {
resolve((<Buffer>data).toString('hex'));
}
});
input.pipe(hash);
async checksum(root: string, directories: string[], files: string[]) {
const { stdout } = await util.promisify(child_process.execFile)('sha256sum', files, { maxBuffer: 1 * 1024 ** 2 });
return Object.fromEntries([
['.', ''],
...directories.map((d) => [d, '']),
...stdout.split('\n').map((line) => line.split(' ', 2).reverse()),
]);
}
archive(archive: string = `${uuidv4()}.tar.gz`, root: string, files: string[]) {
const child = child_process.spawn('tar', ['-zcvf', '-'].concat(files), {
cwd: root,
stdio: ['ignore', 'pipe', 'inherit'],
});
return this.s3.s3.send(
new PutObjectCommand({
Bucket: this.s3.bucket,
Key: archive,
Body: child.stdout,
})
);
}
private spawnAsync(
command: string,
args: string[],
options: child_process.SpawnOptions,
) {
private spawnAsync(command: string, args: string[], options: child_process.SpawnOptions) {
return new Promise<void>((resolve, reject) => {
const child = child_process.spawn(command, args, options);
child.on('exit', (code) => {
......@@ -85,41 +123,9 @@ export class PackagerService extends ConsoleLogger {
});
});
}
}
private archive(
archive: string,
files: string[],
directory: string,
): Promise<void> {
return tar.c({ gzip: true, C: directory, file: archive }, files);
}
private async createDirectoryIfNotExists(directory: string) {
try {
await fs.promises.access(directory);
} catch (e) {
await fs.promises.mkdir(directory, { recursive: true });
}
}
private async unarchiveStream(stream: fs.ReadStream, directory: string) {
await this.createDirectoryIfNotExists(directory);
const unarchiveProcessStream = stream.pipe(tar.x({ C: directory }));
return new Promise<void>((resolve, reject) => {
unarchiveProcessStream.on('finish', resolve);
unarchiveProcessStream.on('error', reject);
});
}
private async packageProcess(app: App, stream: fs.ReadStream) {
const unarchiveDestination = path.join(
this.workingPath,
app.packageFullPath,
);
const packageWorkingRoot = path.join(this.workingPath, app.id);
}
async package(app: App, stream: fs.ReadStream) {
const queue = this.getQueue();
}
interface BuildResult {
checksum: Record<string, string>;
packages: Record<string, string[]>;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment