Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ export interface TriggerHookTaskData extends TaskBaseData {
responseStatus?: number;
}

export interface CreateSyncBinaryTaskData extends TaskBaseData {
fullDiff?: boolean;
}
export interface CreateSyncPackageTaskData extends TaskBaseData {
tips?: string;
skipDependencies?: boolean;
Expand All @@ -74,6 +77,7 @@ export interface TaskUpdateCondition {
export type CreateHookTask = Task<CreateHookTaskData>;
export type TriggerHookTask = Task<TriggerHookTaskData>;
export type CreateSyncPackageTask = Task<CreateSyncPackageTaskData>;
export type CreateSyncBinaryTask = Task<CreateSyncBinaryTaskData>;
export type ChangesStreamTask = Task<ChangesStreamTaskData>;

export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
Expand Down
21 changes: 16 additions & 5 deletions app/core/service/BinarySyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { NFSAdapter } from '../../common/adapter/NFSAdapter';
import { TaskType, TaskState } from '../../common/enum/Task';
import { downloadToTempfile } from '../../common/FileUtil';
import { BinaryRepository } from '../../repository/BinaryRepository';
import { Task } from '../entity/Task';
import { CreateSyncBinaryTask, Task } from '../entity/Task';
import { Binary } from '../entity/Binary';
import { TaskService } from './TaskService';
import { AbstractBinary, BinaryItem } from '../../common/adapter/binary/AbstractBinary';
Expand Down Expand Up @@ -106,12 +106,15 @@ export class BinarySyncerService extends AbstractService {
return await this.taskService.findExecuteTask(TaskType.SyncBinary);
}

public async executeTask(task: Task) {
public async executeTask(task: CreateSyncBinaryTask) {
const binaryName = task.targetName as BinaryName;
const binaryAdapter = await this.getBinaryAdapter(binaryName);
const logUrl = `${this.config.cnpmcore.registry}/-/binary/${binaryName}/syncs/${task.taskId}/log`;
let logs: string[] = [];
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 Start sync binary "${binaryName}" 🚧🚧🚧🚧🚧`);
if (task.data?.fullDiff) {
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 full diff 🚧🚧🚧🚧🚧`);
}
if (!binaryAdapter) {
task.error = 'unknow binaryName';
logs.push(`[${isoNow()}] ❌ Synced "${binaryName}" fail, ${task.error}, log: ${logUrl}`);
Expand Down Expand Up @@ -155,15 +158,15 @@ export class BinarySyncerService extends AbstractService {
}
}

private async syncDir(binaryAdapter: AbstractBinary, task: Task, dir: string, parentIndex = '', latestVersionParent = '/') {
private async syncDir(binaryAdapter: AbstractBinary, task: CreateSyncBinaryTask, dir: string, parentIndex = '', latestVersionParent = '/') {
const binaryName = task.targetName as BinaryName;
const result = await binaryAdapter.fetch(dir, binaryName);
let hasDownloadError = false;
let hasItems = false;
if (result && result.items.length > 0) {
hasItems = true;
let logs: string[] = [];
const { newItems, latestVersionDir } = await this.diff(binaryName, dir, result.items, latestVersionParent);
const { newItems, latestVersionDir } = await this.diff(binaryName, dir, result.items, latestVersionParent, task.data?.fullDiff);
logs.push(`[${isoNow()}][${dir}] 🚧 Syncing diff: ${result.items.length} => ${newItems.length}, Binary class: ${binaryAdapter.constructor.name}`);
// re-check latest version
for (const [ index, { item, reason }] of newItems.entries()) {
Expand Down Expand Up @@ -244,7 +247,7 @@ export class BinarySyncerService extends AbstractService {
// 上游可能正在发布新版本、同步流程中断,导致同步的时候,文件列表不一致
// 如果的当前目录命中 latestVersionParent 父目录,那么就再校验一下当前目录
// 如果 existsItems 为空或者经过修改,那么就不需要 revalidate 了
private async diff(binaryName: BinaryName, dir: string, fetchItems: BinaryItem[], latestVersionParent = '/') {
private async diff(binaryName: BinaryName, dir: string, fetchItems: BinaryItem[], latestVersionParent = '/', fullDiff: boolean | undefined) {
const existsItems = await this.binaryRepository.listBinaries(binaryName, dir);
const existsMap = new Map<string, Binary>();
for (const item of existsItems) {
Expand Down Expand Up @@ -275,6 +278,14 @@ export class BinarySyncerService extends AbstractService {
existsItem.sourceUrl = item.url;
existsItem.ignoreDownloadStatuses = item.ignoreDownloadStatuses;
existsItem.date = item.date;
} else if (fullDiff && item.isDir) {
diffItems.push({
item: existsItem,
reason: `full diff, local: ${JSON.stringify(existsItem.date)}, remote: ${JSON.stringify(item.date)}`,
});
existsItem.sourceUrl = item.url;
existsItem.ignoreDownloadStatuses = item.ignoreDownloadStatuses;
existsItem.date = item.date;
} else if (dir.endsWith(latestVersionParent)) {
const isLatestItem = sortBy(fetchItems, [ 'date' ]).pop()?.name === item.name;
if (isLatestItem && existsItem.isDir) {
Expand Down
12 changes: 11 additions & 1 deletion app/port/schedule/CreateSyncBinaryTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule';
import { Inject } from '@eggjs/tegg';
import { BinarySyncerService } from '../../core/service/BinarySyncerService';
import binaries, { BinaryName } from '../../../config/binaries';
import dayjs from 'dayjs';

@Schedule<IntervalParams>({
type: ScheduleType.WORKER,
Expand All @@ -28,7 +29,16 @@ export class CreateSyncBinaryTask {
// 默认只同步 binaryName 的二进制,即使有不一致的 category,会在同名的 binaryName 任务中同步
// 例如 canvas 只同步 binaryName 为 canvas 的二进制,不同步 category 为 node-canvas-prebuilt 的二进制
// node-canvas-prebuilt 的二进制会在 node-canvas-prebuilt 的任务中同步
await this.binarySyncerService.createTask(binaryName as BinaryName);
await this.binarySyncerService.createTask(binaryName as BinaryName, {
fullDiff: isBetween2and205AM(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不能全部都做吧。。。这个扛不住的,只能有条件少量目录做。

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我这边跑下来的数据,发现差异的有
bun 18
tfjs-models 25
nydus 16
sass-embedded 155
chromium-browser-snapshots 35
node 34
node-nightly 12
python 6

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外 prisma 和 chrome-for-testing 确实不能fulldiff,除了这两个,还有其他不能跑的吗?跑fulldiff对我们这边的服务器压力倒不是很大

Copy link
Copy Markdown
Author

@UestcCarpediem UestcCarpediem Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我重新整理了一下,应该有四个adapter不能full diff,ChromeForTestingBinary,PlaywrightBinary,PrismaBinary,PuppeteerBinary。

});
}
}
}
function isBetween2and205AM() {
const now = dayjs();
const twoAM = now.startOf('day').add(2, 'hour');
const twoAMFive = twoAM.add(5, 'minute');

return now.isAfter(twoAM) && now.isBefore(twoAMFive);
}
84 changes: 84 additions & 0 deletions test/core/service/BinarySyncerService/executeTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,5 +389,89 @@ describe('test/core/service/BinarySyncerService/executeTask.test.ts', () => {
assert(BinaryItems.length === 2);

});
it('should full diff all dir', async () => {
app.mockHttpclient('https://nodejs.org/dist/index.json', 'GET', {
data: await TestUtil.readFixturesFile('nodejs.org/site/index.json'),
persist: false,
});
app.mockHttpclient('https://nodejs.org/dist/latest/docs/apilinks.json', 'GET', {
data: await TestUtil.readFixturesFile('nodejs.org/site/latest/docs/apilinks.json'),
persist: false,
});
app.mockHttpclient('https://nodejs.org/dist/latest/docs/apilinks_old.json', 'GET', {
data: await TestUtil.readFixturesFile('nodejs.org/site/latest/docs/apilinks.json'),
persist: false,
});
await binarySyncerService.createTask('node', {});
let task = await binarySyncerService.findExecuteTask();
assert(task);
mock(NodeBinary.prototype, 'fetch', async (dir: string) => {
if (dir === '/') {
return {
items: [
{ name: 'latest/', isDir: true, url: '', size: '-', date: '17-Dec-2021 23:17' },
{ name: 'old/', isDir: true, url: '', size: '-', date: '15-Dec-2021 23:17' },
{ name: 'index.json', isDir: false, url: 'https://nodejs.org/dist/index.json', size: '219862', date: '17-Dec-2021 23:16' },
],
};
}
if (dir === '/latest/') {
return {
items: [
{ name: 'docs/', isDir: true, url: '', size: '-', date: '17-Dec-2021 21:31' },
],
};
}
if (dir === '/old/') {
return {
items: [
{
name: 'apilinks_old.json',
isDir: false,
url: 'https://nodejs.org/dist/latest/docs/apilinks_old.json',
size: '61606',
date: '17-Dec-2021 21:29',
},
],
};
}
if (dir === '/latest/docs/') {
return {
items: [
{ name: 'apilinks.json', isDir: false, url: 'https://nodejs.org/dist/latest/docs/apilinks.json', size: '61606', date: '17-Dec-2021 21:29' },
],
};
}
return { items: [] };
});
await binarySyncerService.executeTask(task);
app.mockAgent().assertNoPendingInterceptors();
assert(!await TaskModel.findOne({ taskId: task.taskId }));
assert(await HistoryTaskModel.findOne({ taskId: task.taskId }));
let stream = await binarySyncerService.findTaskLog(task);
assert(stream);
let log = await TestUtil.readStreamToLog(stream);
// console.log(log);
assert(log.includes('Syncing diff: 3 => 3'));
assert(log.includes('[/] 🟢 Synced dir success'));
assert(log.includes('[/latest/] 🟢 Synced dir success'));
assert(log.includes('[/latest/docs/] 🟢 Synced dir success'));
assert(log.includes('[/old/] 🟢 Synced dir success'));
// sync again
await binarySyncerService.createTask('node', { fullDiff: true });
task = await binarySyncerService.findExecuteTask();
assert(task);
await binarySyncerService.executeTask(task);
stream = await binarySyncerService.findTaskLog(task);
assert(stream);
log = await TestUtil.readStreamToLog(stream);
// console.log(log);
assert(log.includes('reason: full diff'));
assert([ ...log.matchAll(/reason: full diff/g) ].length === 3);
assert(log.includes('Syncing diff: 3 => 2'));
assert(log.includes('[/] 🟢 Synced dir success'));
assert(log.includes('[/latest/] 🟢 Synced dir success'));
assert(log.includes('[/old/] 🟢 Synced dir success'));
});
});
});