Compare commits
3 Commits
main
...
bishal/deb
Author | SHA1 | Date |
---|---|---|
Bishal Prasad | 8f35dac68e | |
Bishal Prasad | 938b484130 | |
Bishal Prasad | eff27e646c |
|
@ -28332,6 +28332,7 @@ class Batch {
|
||||||
}
|
}
|
||||||
this.concurrency = concurrency;
|
this.concurrency = concurrency;
|
||||||
this.emitter = new events.EventEmitter();
|
this.emitter = new events.EventEmitter();
|
||||||
|
this.index = 0;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Add a operation into queue.
|
* Add a operation into queue.
|
||||||
|
@ -28385,6 +28386,8 @@ class Batch {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
parallelExecute() {
|
parallelExecute() {
|
||||||
|
const local_index = this.index++;
|
||||||
|
console.log(`parallelExecute ${local_index} Active count: ${this.actives} Completed count: ${this.completed} total: ${this.operations.length}`);
|
||||||
if (this.state === BatchStates.Error) {
|
if (this.state === BatchStates.Error) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -28395,6 +28398,7 @@ class Batch {
|
||||||
while (this.actives < this.concurrency) {
|
while (this.actives < this.concurrency) {
|
||||||
const operation = this.nextOperation();
|
const operation = this.nextOperation();
|
||||||
if (operation) {
|
if (operation) {
|
||||||
|
console.log(`parallelExecute ${local_index} starting execution of operation ${this.offset}. Active count: ${this.actives}`);
|
||||||
operation();
|
operation();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -28833,15 +28837,21 @@ async function streamToBuffer(stream, buffer, offset, end, encoding) {
|
||||||
let pos = 0; // Position in stream
|
let pos = 0; // Position in stream
|
||||||
const count = end - offset; // Total amount of data needed in stream
|
const count = end - offset; // Total amount of data needed in stream
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const timeout = setTimeout(() => reject(new Error(`The operation cannot be completed in timeout.`)), REQUEST_TIMEOUT);
|
const timeout = setTimeout(() => {
|
||||||
|
console.log("Timeout triggered.");
|
||||||
|
return reject(new Error(`The operation cannot be completed in timeout.`));
|
||||||
|
}, REQUEST_TIMEOUT);
|
||||||
stream.on("readable", () => {
|
stream.on("readable", () => {
|
||||||
|
console.log("Entering readable");
|
||||||
if (pos >= count) {
|
if (pos >= count) {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
|
console.log("Leaving readable");
|
||||||
resolve();
|
resolve();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let chunk = stream.read();
|
let chunk = stream.read();
|
||||||
if (!chunk) {
|
if (!chunk) {
|
||||||
|
console.log("Leaving readable");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (typeof chunk === "string") {
|
if (typeof chunk === "string") {
|
||||||
|
@ -28851,6 +28861,7 @@ async function streamToBuffer(stream, buffer, offset, end, encoding) {
|
||||||
const chunkLength = pos + chunk.length > count ? count - pos : chunk.length;
|
const chunkLength = pos + chunk.length > count ? count - pos : chunk.length;
|
||||||
buffer.fill(chunk.slice(0, chunkLength), offset + pos, offset + pos + chunkLength);
|
buffer.fill(chunk.slice(0, chunkLength), offset + pos, offset + pos + chunkLength);
|
||||||
pos += chunkLength;
|
pos += chunkLength;
|
||||||
|
console.log("Leaving readable");
|
||||||
});
|
});
|
||||||
stream.on("end", () => {
|
stream.on("end", () => {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
|
|
Loading…
Reference in New Issue