From 0816faf84cbe0d3a2dda8b249b4c2dcfd7a23149 Mon Sep 17 00:00:00 2001 From: Josh Gross Date: Tue, 17 Dec 2019 14:40:24 -0500 Subject: [PATCH] Use fs streams directly --- dist/restore/index.js | 54 +++++++++++++++--------------------------- dist/save/index.js | 54 +++++++++++++++--------------------------- src/cacheHttpClient.ts | 51 ++++++++++++++++----------------------- 3 files changed, 59 insertions(+), 100 deletions(-) diff --git a/dist/restore/index.js b/dist/restore/index.js index efe2e13..eb89f94 100644 --- a/dist/restore/index.js +++ b/dist/restore/index.js @@ -1497,7 +1497,6 @@ const Handlers_1 = __webpack_require__(941); const HttpClient_1 = __webpack_require__(874); const RestClient_1 = __webpack_require__(105); const utils = __importStar(__webpack_require__(443)); -const stream_1 = __webpack_require__(794); const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks function isSuccessStatusCode(statusCode) { return statusCode >= 200 && statusCode < 300; @@ -1584,30 +1583,29 @@ function reserveCache(key) { }); } exports.reserveCache = reserveCache; -function getContentRange(start, length) { +function getContentRange(start, end) { // Format: `bytes start-end/filesize // start and end are inclusive // filesize can be * // For a 200 byte chunk starting at byte 0: // Content-Range: bytes 0-199/* - return `bytes ${start}-${start + length - 1}/*`; + return `bytes ${start}-${end}/*`; } -function bufferToStream(buffer) { - const stream = new stream_1.Duplex(); - stream.push(buffer); - stream.push(null); - return stream; -} -function uploadChunk(restClient, resourceUrl, data, offset) { +// function bufferToStream(buffer: Buffer): NodeJS.ReadableStream { +// const stream = new Duplex(); +// stream.push(buffer); +// stream.push(null); +// return stream; +// } +function uploadChunk(restClient, resourceUrl, data, start, end) { return __awaiter(this, void 0, void 0, function* () { - core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); + core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`); const requestOptions = getRequestOptions(); requestOptions.additionalHeaders = { "Content-Type": "application/octet-stream", - "Content-Range": getContentRange(offset, data.byteLength) + "Content-Range": getContentRange(start, end) }; - const stream = bufferToStream(data); - return yield restClient.uploadStream("PATCH", resourceUrl, stream, requestOptions); + return yield restClient.uploadStream("PATCH", resourceUrl, data, requestOptions); }); } function commitCache(restClient, cacheId, filesize) { @@ -1622,26 +1620,19 @@ function saveCache(cacheId, archivePath) { const restClient = createRestClient(); core.debug("Uploading chunks"); // Upload Chunks - const stream = fs.createReadStream(archivePath); - let streamIsClosed = false; - stream.on("end", () => { - core.debug("Stream is ended"); - streamIsClosed = true; - }); + const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + cacheId.toString(); const uploads = []; let offset = 0; - while (!streamIsClosed) { + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const end = offset + chunkSize - 1; core.debug(`Offset: ${offset}`); - const chunk = stream.read(MAX_CHUNK_SIZE); - if (chunk == null) { - core.debug(`Chunk is null, reading is over?`); - break; - } - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset)); + const chunk = fs.createReadStream(archivePath, { start: offset, end }); + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } - core.debug("Awaiting all uplaods"); + core.debug("Awaiting all uploads"); const responses = yield Promise.all(uploads); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { @@ -3157,13 +3148,6 @@ run(); exports.default = run; -/***/ }), - -/***/ 794: -/***/ (function(module) { - -module.exports = require("stream"); - /***/ }), /***/ 826: diff --git a/dist/save/index.js b/dist/save/index.js index 320fc1e..8a46377 100644 --- a/dist/save/index.js +++ b/dist/save/index.js @@ -1497,7 +1497,6 @@ const Handlers_1 = __webpack_require__(941); const HttpClient_1 = __webpack_require__(874); const RestClient_1 = __webpack_require__(105); const utils = __importStar(__webpack_require__(443)); -const stream_1 = __webpack_require__(794); const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks function isSuccessStatusCode(statusCode) { return statusCode >= 200 && statusCode < 300; @@ -1584,30 +1583,29 @@ function reserveCache(key) { }); } exports.reserveCache = reserveCache; -function getContentRange(start, length) { +function getContentRange(start, end) { // Format: `bytes start-end/filesize // start and end are inclusive // filesize can be * // For a 200 byte chunk starting at byte 0: // Content-Range: bytes 0-199/* - return `bytes ${start}-${start + length - 1}/*`; + return `bytes ${start}-${end}/*`; } -function bufferToStream(buffer) { - const stream = new stream_1.Duplex(); - stream.push(buffer); - stream.push(null); - return stream; -} -function uploadChunk(restClient, resourceUrl, data, offset) { +// function bufferToStream(buffer: Buffer): NodeJS.ReadableStream { +// const stream = new Duplex(); +// stream.push(buffer); +// stream.push(null); +// return stream; +// } +function uploadChunk(restClient, resourceUrl, data, start, end) { return __awaiter(this, void 0, void 0, function* () { - core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); + core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`); const requestOptions = getRequestOptions(); requestOptions.additionalHeaders = { "Content-Type": "application/octet-stream", - "Content-Range": getContentRange(offset, data.byteLength) + "Content-Range": getContentRange(start, end) }; - const stream = bufferToStream(data); - return yield restClient.uploadStream("PATCH", resourceUrl, stream, requestOptions); + return yield restClient.uploadStream("PATCH", resourceUrl, data, requestOptions); }); } function commitCache(restClient, cacheId, filesize) { @@ -1622,26 +1620,19 @@ function saveCache(cacheId, archivePath) { const restClient = createRestClient(); core.debug("Uploading chunks"); // Upload Chunks - const stream = fs.createReadStream(archivePath); - let streamIsClosed = false; - stream.on("end", () => { - core.debug("Stream is ended"); - streamIsClosed = true; - }); + const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + cacheId.toString(); const uploads = []; let offset = 0; - while (!streamIsClosed) { + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const end = offset + chunkSize - 1; core.debug(`Offset: ${offset}`); - const chunk = stream.read(MAX_CHUNK_SIZE); - if (chunk == null) { - core.debug(`Chunk is null, reading is over?`); - break; - } - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset)); + const chunk = fs.createReadStream(archivePath, { start: offset, end }); + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } - core.debug("Awaiting all uplaods"); + core.debug("Awaiting all uploads"); const responses = yield Promise.all(uploads); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { @@ -3139,13 +3130,6 @@ module.exports = require("fs"); /***/ }), -/***/ 794: -/***/ (function(module) { - -module.exports = require("stream"); - -/***/ }), - /***/ 826: /***/ (function(module, __unusedexports, __webpack_require__) { diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts index b8aade7..c8ea6cd 100644 --- a/src/cacheHttpClient.ts +++ b/src/cacheHttpClient.ts @@ -15,7 +15,6 @@ import { ReserverCacheResponse } from "./contracts"; import * as utils from "./utils/actionUtils"; -import { Duplex } from "stream"; const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks @@ -127,38 +126,38 @@ export async function reserveCache( return response?.result?.cacheId ?? -1; } -function getContentRange(start: number, length: number): string { +function getContentRange(start: number, end: number): string { // Format: `bytes start-end/filesize // start and end are inclusive // filesize can be * // For a 200 byte chunk starting at byte 0: // Content-Range: bytes 0-199/* - return `bytes ${start}-${start + length - 1}/*`; + return `bytes ${start}-${end}/*`; } -function bufferToStream(buffer: Buffer): NodeJS.ReadableStream { - const stream = new Duplex(); - stream.push(buffer); - stream.push(null); +// function bufferToStream(buffer: Buffer): NodeJS.ReadableStream { +// const stream = new Duplex(); +// stream.push(buffer); +// stream.push(null); - return stream; -} +// return stream; +// } async function uploadChunk( restClient: RestClient, resourceUrl: string, - data: Buffer, - offset: number + data: NodeJS.ReadableStream, + start: number, + end: number ): Promise> { - core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); + core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`); const requestOptions = getRequestOptions(); requestOptions.additionalHeaders = { "Content-Type": "application/octet-stream", - "Content-Range": getContentRange(offset, data.byteLength) + "Content-Range": getContentRange(start, end) }; - const stream = bufferToStream(data); - return await restClient.uploadStream("PATCH", resourceUrl, stream, requestOptions); + return await restClient.uploadStream("PATCH", resourceUrl, data, requestOptions); } async function commitCache( @@ -183,28 +182,20 @@ export async function saveCache( core.debug("Uploading chunks"); // Upload Chunks - const stream = fs.createReadStream(archivePath); - let streamIsClosed = false; - stream.on("end", () => { - core.debug("Stream is ended"); - streamIsClosed = true; - }); - + const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + cacheId.toString(); const uploads: Promise>[] = []; let offset = 0; - while (!streamIsClosed) { + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const end = offset + chunkSize - 1; core.debug(`Offset: ${offset}`); - const chunk: Buffer = stream.read(MAX_CHUNK_SIZE); - if (chunk == null) { - core.debug(`Chunk is null, reading is over?`); - break; - } - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset)); + const chunk = fs.createReadStream(archivePath, { start: offset, end }); + uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); offset += MAX_CHUNK_SIZE; } - core.debug("Awaiting all uplaods"); + core.debug("Awaiting all uploads"); const responses = await Promise.all(uploads); const failedResponse = responses.find(