diff --git a/dist/restore/index.js b/dist/restore/index.js index a6964ee..cb0ace4 100644 --- a/dist/restore/index.js +++ b/dist/restore/index.js @@ -1615,6 +1615,28 @@ function commitCache(restClient, cacheId, filesize) { return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions); }); } +function parallelAwait(queue, concurrency) { + var _a; + return __awaiter(this, void 0, void 0, function* () { + const workQueue = queue.reverse(); + let completedWork = []; + let entries = queue.length; + while (entries > 0) { + if (entries < concurrency) { + completedWork.push(yield Promise.all(workQueue)); + } + else { + let promises = []; + let i; + for (i = 0; i < concurrency; i++) { + promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve()))); + } + completedWork.push(yield Promise.all(promises)); + } + } + return completedWork; + }); +} function saveCache(cacheId, archivePath) { return __awaiter(this, void 0, void 0, function* () { const restClient = createRestClient(); @@ -1629,13 +1651,14 @@ function saveCache(cacheId, archivePath) { const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; const end = offset + chunkSize - 1; const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(yield uploadChunk(restClient, resourceUrl, chunk, offset, end)); // Making this serial + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } - fs.closeSync(fd); core.debug("Awaiting all uploads"); + const responses = yield parallelAwait(uploads, 4); + fs.closeSync(fd); //const responses = await Promise.all(uploads); - const failedResponse = uploads.find(x => !isSuccessStatusCode(x.statusCode)); + const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`); } diff --git a/dist/save/index.js b/dist/save/index.js index 865ec4b..0451f3f 100644 --- a/dist/save/index.js +++ b/dist/save/index.js @@ -1615,6 +1615,28 @@ function commitCache(restClient, cacheId, filesize) { return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions); }); } +function parallelAwait(queue, concurrency) { + var _a; + return __awaiter(this, void 0, void 0, function* () { + const workQueue = queue.reverse(); + let completedWork = []; + let entries = queue.length; + while (entries > 0) { + if (entries < concurrency) { + completedWork.push(yield Promise.all(workQueue)); + } + else { + let promises = []; + let i; + for (i = 0; i < concurrency; i++) { + promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve()))); + } + completedWork.push(yield Promise.all(promises)); + } + } + return completedWork; + }); +} function saveCache(cacheId, archivePath) { return __awaiter(this, void 0, void 0, function* () { const restClient = createRestClient(); @@ -1629,13 +1651,14 @@ function saveCache(cacheId, archivePath) { const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; const end = offset + chunkSize - 1; const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(yield uploadChunk(restClient, resourceUrl, chunk, offset, end)); // Making this serial + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } - fs.closeSync(fd); core.debug("Awaiting all uploads"); + const responses = yield parallelAwait(uploads, 4); + fs.closeSync(fd); //const responses = await Promise.all(uploads); - const failedResponse = uploads.find(x => !isSuccessStatusCode(x.statusCode)); + const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`); } diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts index 195bdab..db74710 100644 --- a/src/cacheHttpClient.ts +++ b/src/cacheHttpClient.ts @@ -174,6 +174,26 @@ async function commitCache( ); } +async function parallelAwait(queue: Promise[], concurrency: number): Promise { + const workQueue = queue.reverse(); + let completedWork: any[] = []; + let entries = queue.length; + while (entries > 0) { + if (entries < concurrency) { + completedWork.push(await Promise.all(workQueue)); + } else { + let promises: Promise[] = []; + let i: number; + for (i = 0; i < concurrency; i++) { + promises.push(workQueue.pop() ?? Promise.resolve()); + } + completedWork.push(await Promise.all(promises)); + } + } + + return completedWork; +} + export async function saveCache( cacheId: number, archivePath: string @@ -184,7 +204,7 @@ export async function saveCache( // Upload Chunks const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString(); - const uploads: IRestResponse[] = []; + const uploads: Promise>[] = []; const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too? let offset = 0; @@ -192,16 +212,18 @@ export async function saveCache( const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; const end = offset + chunkSize - 1; const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(await uploadChunk(restClient, resourceUrl, chunk, offset, end)); // Making this serial + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } + core.debug("Awaiting all uploads"); + const responses = await parallelAwait(uploads, 4); fs.closeSync(fd); - core.debug("Awaiting all uploads"); + //const responses = await Promise.all(uploads); - const failedResponse = uploads.find( + const failedResponse = responses.find( x => !isSuccessStatusCode(x.statusCode) ); if (failedResponse) {