From d2e73bcf864692b3ed2cca921be9e9b51af3b0f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vin=C3=ADcius=20Louren=C3=A7o?= Date: Fri, 6 Oct 2023 22:03:58 -0300 Subject: [PATCH] lib/webstreams: reduce overhead of transfering --- benchmark/webstreams/js_transfer.js | 52 +++++++++++ lib/internal/webstreams/readablestream.js | 48 ++++++---- lib/internal/webstreams/transformstream.js | 44 +++++---- lib/internal/webstreams/writablestream.js | 104 +++++++++++---------- 4 files changed, 163 insertions(+), 85 deletions(-) create mode 100644 benchmark/webstreams/js_transfer.js diff --git a/benchmark/webstreams/js_transfer.js b/benchmark/webstreams/js_transfer.js new file mode 100644 index 00000000000000..fc35175ed50d08 --- /dev/null +++ b/benchmark/webstreams/js_transfer.js @@ -0,0 +1,52 @@ +'use strict'; + +const common = require('../common.js'); + +const { MessageChannel } = require('worker_threads'); +const { WritableStream, TransformStream, ReadableStream } = require('stream/web'); + +const bench = common.createBenchmark(main, { + payload: ['WritableStream', 'ReadableStream', 'TransformStream'], + n: [1e4], +}); + +function main({ n, payload: payloadType }) { + let createPayload; + let messages = 0; + + switch (payloadType) { + case 'WritableStream': + createPayload = () => new WritableStream(); + break; + case 'ReadableStream': + createPayload = () => new ReadableStream(); + break; + case 'TransformStream': + createPayload = () => new TransformStream(); + break; + default: + throw new Error('Unsupported payload type'); + } + + const { port1, port2 } = new MessageChannel(); + + port2.onmessage = onMessage; + + function onMessage() { + if (messages++ === n) { + bench.end(n); + port1.close(); + } else { + send(); + } + } + + function send() { + const stream = createPayload(); + + port1.postMessage(stream, [stream]); + } + + bench.start(); + send(); +} diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8f074efe696455..83b49028eae8e0 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -17,7 +17,6 @@ const { PromisePrototypeThen, PromiseResolve, PromiseReject, - ReflectConstruct, SafePromiseAll, Symbol, SymbolAsyncIterator, @@ -642,26 +641,37 @@ ObjectDefineProperties(ReadableStream.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name), }); -function TransferredReadableStream() { - return ReflectConstruct( - function() { - markTransferMode(this, false, true); - this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port: undefined, - promise: undefined, - }, - }; - this[kIsClosedPromise] = createDeferredPromise(); +function InternalTransferredReadableStream() { + markTransferMode(this, false, true); + this[kType] = 'ReadableStream'; + this[kState] = { + disturbed: false, + reader: undefined, + state: 'readable', + storedError: undefined, + stream: undefined, + transfer: { + writable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, }, - [], ReadableStream); + }; + + this[kIsClosedPromise] = createDeferredPromise(); } + +ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(InternalTransferredReadableStream, ReadableStream); + +function TransferredReadableStream() { + const stream = new InternalTransferredReadableStream(); + + stream.constructor = ReadableStream; + + return stream; +} + TransferredReadableStream.prototype[kDeserialize] = () => {}; class ReadableStreamBYOBRequest { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 7a8f20a6eece38..4d47856a6a12de 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -4,9 +4,9 @@ const { FunctionPrototypeBind, FunctionPrototypeCall, ObjectDefineProperties, + ObjectSetPrototypeOf, PromisePrototypeThen, PromiseResolve, - ReflectConstruct, SymbolToStringTag, Symbol, } = primordials; @@ -247,25 +247,33 @@ ObjectDefineProperties(TransformStream.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name), }); -function TransferredTransformStream() { - return ReflectConstruct( - function() { - markTransferMode(this, false, true); - this[kType] = 'TransformStream'; - this[kState] = { - readable: undefined, - writable: undefined, - backpressure: undefined, - backpressureChange: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - controller: undefined, - }; +function InternalTransferredTransformStream() { + markTransferMode(this, false, true); + this[kType] = 'TransformStream'; + this[kState] = { + readable: undefined, + writable: undefined, + backpressure: undefined, + backpressureChange: { + promise: undefined, + resolve: undefined, + reject: undefined, }, - [], TransformStream); + controller: undefined, + }; } + +ObjectSetPrototypeOf(InternalTransferredTransformStream.prototype, TransformStream.prototype); +ObjectSetPrototypeOf(InternalTransferredTransformStream, TransformStream); + +function TransferredTransformStream() { + const stream = new InternalTransferredTransformStream(); + + stream.constructor = TransformStream; + + return stream; +} + TransferredTransformStream.prototype[kDeserialize] = () => {}; class TransformStreamDefaultController { diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 46d6ae28772c32..2115aba36e927b 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -6,10 +6,10 @@ const { FunctionPrototypeBind, FunctionPrototypeCall, ObjectDefineProperties, + ObjectSetPrototypeOf, PromisePrototypeThen, PromiseResolve, PromiseReject, - ReflectConstruct, Symbol, SymbolToStringTag, } = primordials; @@ -326,55 +326,63 @@ ObjectDefineProperties(WritableStream.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name), }); -function TransferredWritableStream() { - return ReflectConstruct( - function() { - markTransferMode(this, false, true); - this[kType] = 'WritableStream'; - this[kState] = { - close: createDeferredPromise(), - closeRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, - backpressure: false, - controller: undefined, - state: 'writable', - storedError: undefined, - writeRequests: [], - writer: undefined, - transfer: { - promise: undefined, - port1: undefined, - port2: undefined, - readable: undefined, - }, - }; - this[kIsClosedPromise] = createDeferredPromise(); - this[kControllerErrorFunction] = () => {}; +function InternalTransferredWritableStream() { + markTransferMode(this, false, true); + this[kType] = 'WritableStream'; + this[kState] = { + close: createDeferredPromise(), + closeRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightWriteRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, }, - [], WritableStream); + inFlightCloseRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + pendingAbortRequest: { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }, + backpressure: false, + controller: undefined, + state: 'writable', + storedError: undefined, + writeRequests: [], + writer: undefined, + transfer: { + readable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; + + this[kIsClosedPromise] = createDeferredPromise(); } + +ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype); +ObjectSetPrototypeOf(InternalTransferredWritableStream, WritableStream); + +function TransferredWritableStream() { + const stream = new InternalTransferredWritableStream(); + + stream.constructor = WritableStream; + + return stream; +} + TransferredWritableStream.prototype[kDeserialize] = () => {}; class WritableStreamDefaultWriter {