Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WritableStreamDefaultController.releaseBackpressure() #1190

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
114 changes: 86 additions & 28 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ default-reader-asynciterator-prototype-internal-slots">Asynchronous iteration</h
1. Let |reader| be |iterator|'s [=ReadableStream async iterator/reader=].
1. If |reader|.[=ReadableStreamGenericReader/[[stream]]=] is undefined, return [=a promise resolved
with=] undefined.
1. Assert: |reader|.[=ReadableStreamDefaultReader/[[readRequests]]=] is [=list/is empty|empty=],
1. Assert: |reader|.[=ReadableStreamDefaultReader/[[readRequests]]=] [=list/is empty=],
as the async iterator machinery guarantees that any previous calls to `next()` have settled
before this is called.
1. If |iterator|'s [=ReadableStream async iterator/prevent cancel=] is false:
Expand Down Expand Up @@ -2380,7 +2380,7 @@ create them does not matter.
1. If |canceled1| is false or |canceled2| is false, [=resolve=] |cancelPromise| with undefined.
1. Let |pullWithDefaultReader| be the following steps:
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}},
1. Assert: |reader|.[=ReadableStreamBYOBReader/[[readIntoRequests]]=] is [=list/is empty|empty=].
1. Assert: |reader|.[=ReadableStreamBYOBReader/[[readIntoRequests]]=] [=list/is empty=].
1. Perform ! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|stream|).
1. Perform |forwardReaderError|, given |reader|.
Expand Down Expand Up @@ -2435,7 +2435,7 @@ create them does not matter.
1. Perform ! [$ReadableStreamDefaultReaderRead$](|reader|, |readRequest|).
1. Let |pullWithBYOBReader| be the following steps, given |view| and |forBranch2|:
1. If |reader| [=implements=] {{ReadableStreamDefaultReader}},
1. Assert: |reader|.[=ReadableStreamDefaultReader/[[readRequests]]=] is [=list/is empty|empty=].
1. Assert: |reader|.[=ReadableStreamDefaultReader/[[readRequests]]=] [=list/is empty=].
1. Perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamBYOBReader$](|stream|).
1. Perform |forwardReaderError|, given |reader|.
Expand Down Expand Up @@ -3151,7 +3151,8 @@ The following abstract operations support the implementation of the
1. If |controller|.[=ReadableByteStreamController/[[queueTotalSize]]=] > 0,
1. Set |controller|.[=ReadableByteStreamController/[[closeRequested]]=] to true.
1. Return.
1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty,
1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not
[=list/is empty|empty=],
1. Let |firstPendingPullInto| be
|controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=][0].
1. If |firstPendingPullInto|'s [=pull-into descriptor/bytes filled=] > 0,
Expand Down Expand Up @@ -3225,8 +3226,7 @@ The following abstract operations support the implementation of the
1. If ! [$ReadableStreamHasDefaultReader$](|stream|) is true,
1. Perform ! [$ReadableByteStreamControllerProcessReadRequestsUsingQueue$](|controller|).
1. If ! [$ReadableStreamGetNumReadRequests$](|stream|) is 0,
1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is
[=list/is empty|empty=].
1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] [=list/is empty=].
1. Perform ! [$ReadableByteStreamControllerEnqueueChunkToQueue$](|controller|,
|transferredBuffer|, |byteOffset|, |byteLength|).
1. Otherwise,
Expand Down Expand Up @@ -3504,7 +3504,8 @@ The following abstract operations support the implementation of the
|byteLength|, [=pull-into descriptor/bytes filled=] 0, [=pull-into descriptor/element size=]
|elementSize|, [=pull-into descriptor/view constructor=] |ctor|, and [=pull-into
descriptor/reader type=] "`byob`".
1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty,
1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not
[=list/is empty|empty=],
1. [=list/Append=] |pullIntoDescriptor| to
|controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=].
1. Perform ! [$ReadableStreamAddReadIntoRequest$](|stream|, |readIntoRequest|).
Expand Down Expand Up @@ -3538,7 +3539,8 @@ The following abstract operations support the implementation of the
id="readable-byte-stream-controller-respond">ReadableByteStreamControllerRespond(|controller|,
|bytesWritten|)</dfn> performs the following steps:

1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty.
1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not
[=list/is empty|empty=].
1. Let |firstDescriptor| be |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=][0].
1. Let |state| be
|controller|.[=ReadableByteStreamController/[[stream]]=].[=ReadableStream/[[state]]=].
Expand Down Expand Up @@ -4293,11 +4295,12 @@ following table:

<dt><code>await <var ignore>writer</var>.{{WritableStreamDefaultWriter/ready}}</code>
<dd>
<p>Returns a promise that will be fulfilled when the [=desired size to fill a stream's internal
queue|desired size to fill the stream's internal queue=] transitions from non-positive to
positive, signaling that it is no longer applying [=backpressure=]. Once the [=desired size to
fill a stream's internal queue|desired size=] dips back to zero or below, the getter will return
a new promise that stays pending until the next transition.
<p>Returns a promise that will be fulfilled when either the [=desired size to fill a stream's
internal queue|desired size to fill the stream's internal queue=] transitions from non-positive to
positive or when the stream calls {{WritableStreamDefaultController/releaseBackpressure()}},
signaling that it is no longer applying [=backpressure=].
Once the [=desired size to fill a stream's internal queue|desired size=] dips back to zero or below,
the getter will return a new promise that stays pending until the next transition.

<p>If the stream becomes errored or aborted, or the writer's lock is [=release a write
lock|released=], the returned promise will become rejected.
Expand Down Expand Up @@ -4423,6 +4426,7 @@ The Web IDL definition for the {{WritableStreamDefaultController}} class is give
interface WritableStreamDefaultController {
readonly attribute AbortSignal signal;
undefined error(optional any e);
undefined releaseBackpressure();
};
</xmp>

Expand Down Expand Up @@ -4452,6 +4456,10 @@ the following table:
<td><dfn>\[[queueTotalSize]]</dfn>
<td class="non-normative">The total size of all the chunks stored in
[=WritableStreamDefaultController/[[queue]]=] (see [[#queue-with-sizes]])
<tr>
<td><dfn>\[[releaseBackpressure]]</dfn>
<td class="non-normative">A boolean flag indicating whether to release backpressure until the
next chunk is written
<tr>
<td><dfn>\[[signal]]</dfn>
<td class="non-normative">An {{AbortSignal}} that can be used to abort the pending write or
Expand Down Expand Up @@ -4498,6 +4506,17 @@ closed. It is only used internally, and is never exposed to web developers.
the [=underlying sink=]'s methods. However, it can be useful for suddenly shutting down a stream
in response to an event outside the normal lifecycle of interactions with the [=underlying
sink=].
<dt><code><var ignore>controller</var>.{{WritableStreamDefaultController/releaseBackpressure()|releaseBackpressure}}()</code>
<dd>
<p>Releases [=backpressure=] until the next chunk is written. If there are still chunks in the queue,
this does nothing.

<p>Usually, backpressure is automatically released when the [=desired size to fill a stream's
internal queue|desired size to fill the stream's internal queue=] becomes positive. However,
if the stream is created with a [=high water mark=] of zero, then the desired size is always at or
below zero, and the stream would always apply backpressure.
This method allows the [=underlying sink=] to manually release backpressure, for example when it
knows that now is a good time for the [=producer=] to write a new chunk.
</dl>

<div algorithm>
Expand All @@ -4516,6 +4535,15 @@ closed. It is only used internally, and is never exposed to web developers.
1. Perform ! [$WritableStreamDefaultControllerError$]([=this=], |e|).
</div>

<div algorithm>
The <dfn id="ws-default-controller-release-backpressure" method
for="WritableStreamDefaultController">releaseBackpressure()</dfn> method steps are:

1. Let |state| be [=this=].[=WritableStreamDefaultController/[[stream]]=].[=WritableStream/[[state]]=].
1. If |state| is not "`writable`", return.
1. Perform ! [$WritableStreamDefaultControllerReleaseBackpressure$]([=this=]).
</div>

<h4 id="ws-default-controller-internal-methods">Internal methods</h4>

The following are internal methods implemented by each {{WritableStreamDefaultController}} instance.
Expand Down Expand Up @@ -4881,7 +4909,7 @@ the {{WritableStream}}'s public API.
performs the following steps:

1. Assert: |stream|.[=WritableStream/[[inFlightWriteRequest]]=] is undefined.
1. Assert: |stream|.[=WritableStream/[[writeRequests]]=] is not empty.
1. Assert: |stream|.[=WritableStream/[[writeRequests]]=] is not [=list/is empty|empty=].
1. Let |writeRequest| be |stream|.[=WritableStream/[[writeRequests]]=][0].
1. [=list/Remove=] |writeRequest| from |stream|.[=WritableStream/[[writeRequests]]=].
1. Set |stream|.[=WritableStream/[[inFlightWriteRequest]]=] to |writeRequest|.
Expand Down Expand Up @@ -5080,6 +5108,7 @@ The following abstract operations support the implementation of the
1. Set |controller|.[=WritableStreamDefaultController/[[stream]]=] to |stream|.
1. Set |stream|.[=WritableStream/[[controller]]=] to |controller|.
1. Perform ! [$ResetQueue$](|controller|).
1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false.
1. Set |controller|.[=WritableStreamDefaultController/[[signal]]=] to a new {{AbortSignal}}.
1. Set |controller|.[=WritableStreamDefaultController/[[started]]=] to false.
1. Set |controller|.[=WritableStreamDefaultController/[[strategySizeAlgorithm]]=] to
Expand Down Expand Up @@ -5146,7 +5175,7 @@ The following abstract operations support the implementation of the
1. If |state| is "`erroring`",
1. Perform ! [$WritableStreamFinishErroring$](|stream|).
1. Return.
1. If |controller|.[=WritableStreamDefaultController/[[queue]]=] is empty, return.
1. If |controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=], return.
1. Let |value| be ! [$PeekQueueValue$](|controller|).
1. If |value| is the [=close sentinel=], perform !
[$WritableStreamDefaultControllerProcessClose$](|controller|).
Expand Down Expand Up @@ -5211,6 +5240,8 @@ The following abstract operations support the implementation of the
id="writable-stream-default-controller-get-backpressure">WritableStreamDefaultControllerGetBackpressure(|controller|)</dfn>
performs the following steps:

1. If |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] is true and
|controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=], return false.
1. Let |desiredSize| be ! [$WritableStreamDefaultControllerGetDesiredSize$](|controller|).
1. Return true if |desiredSize| ≤ 0, or false otherwise.
</div>
Expand Down Expand Up @@ -5247,7 +5278,7 @@ The following abstract operations support the implementation of the
1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=].
1. Perform ! [$WritableStreamMarkCloseRequestInFlight$](|stream|).
1. Perform ! [$DequeueValue$](|controller|).
1. Assert: |controller|.[=WritableStreamDefaultController/[[queue]]=] is empty.
1. Assert: |controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=].
1. Let |sinkClosePromise| be the result of performing
|controller|.[=WritableStreamDefaultController/[[closeAlgorithm]]=].
1. Perform ! [$WritableStreamDefaultControllerClearAlgorithms$](|controller|).
Expand All @@ -5264,6 +5295,7 @@ The following abstract operations support the implementation of the

1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=].
1. Perform ! [$WritableStreamMarkFirstWriteRequestInFlight$](|stream|).
1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false.
1. Let |sinkWritePromise| be the result of performing
|controller|.[=WritableStreamDefaultController/[[writeAlgorithm]]=], passing in |chunk|.
1. [=Upon fulfillment=] of |sinkWritePromise|,
Expand All @@ -5281,6 +5313,19 @@ The following abstract operations support the implementation of the
1. Perform ! [$WritableStreamFinishInFlightWriteWithError$](|stream|, |reason|).
</div>

<div algorithm>
<dfn abstract-op lt="WritableStreamDefaultControllerReleaseBackpressure">WritableStreamDefaultControllerReleaseBackpressure(|controller|)</dfn>
performs the following steps:

1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=].
1. Assert: |stream|.[=WritableStream/[[state]]=] is "`writable`".
1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to true.
1. If ! [$WritableStreamHasOperationMarkedInFlight$](|stream|) is false and
! [$WritableStreamCloseQueuedOrInFlight$](|stream|) is false,
1. Let |backpressure| be ! [$WritableStreamDefaultControllerGetBackpressure$](|controller|).
1. Perform ! [$WritableStreamUpdateBackpressure$](|stream|, |backpressure|).
</div>

<div algorithm>
<dfn abstract-op lt="WritableStreamDefaultControllerWrite"
id="writable-stream-default-controller-write">WritableStreamDefaultControllerWrite(|controller|,
Expand All @@ -5291,6 +5336,7 @@ The following abstract operations support the implementation of the
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|controller|,
|enqueueResult|.\[[Value]]).
1. Return.
1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false.
1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=].
1. If ! [$WritableStreamCloseQueuedOrInFlight$](|stream|) is false and
|stream|.[=WritableStream/[[state]]=] is "`writable`",
Expand Down Expand Up @@ -5803,6 +5849,9 @@ The following abstract operations operate on {{TransformStream}} instances at a
stream.[=TransformStream/[[backpressureChangePromise]]=] with undefined.
1. Set |stream|.[=TransformStream/[[backpressureChangePromise]]=] to [=a new promise=].
1. Set |stream|.[=TransformStream/[[backpressure]]=] to |backpressure|.
1. If |backpressure| is false and |stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[state]]=]
is "`writable`", perform !
[$WritableStreamDefaultControllerReleaseBackpressure$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=]).
</div>

<h4 id="ts-default-controller-abstract-ops">Default controllers</h4>
Expand Down Expand Up @@ -6903,7 +6952,7 @@ for="ReadableStream">locked</dfn> if ! [$IsReadableStreamLocked$](|stream|) retu
up"><var>writeAlgorithm</var></dfn>, an optional algorithm <dfn export for="WritableStream/set
up"><var>closeAlgorithm</var></dfn>, an optional algorithm <dfn export for="WritableStream/set
up"><var>abortAlgorithm</var></dfn>, an optional number <dfn export for="WritableStream/set
up"><var>highWaterMark</var></dfn> (default 1), an optional algorithm <dfn export
up"><var>highWaterMark</var></dfn> (default 1), and an optional algorithm <dfn export
for="WritableStream/set up"><var>sizeAlgorithm</var></dfn>, perform the following steps.
|writeAlgorithm| must be an algorithm that accepts a [=chunk=] object and returns a promise. If
given, |closeAlgorithm| and |abortAlgorithm| may return a promise. If given, |sizeAlgorithm| must
Expand Down Expand Up @@ -6984,14 +7033,22 @@ reason.
<div algorithm="create a TransformStream">
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, an optional number <dfn
export for="TransformStream/set up"><var>writableHighWaterMark</var></dfn> (default 1), an optional
algorithm <dfn export for="TransformStream/set up"><var>writableSizeAlgorithm</var></dfn>, an
optional number <dfn export for="TransformStream/set up"><var>readableHighWaterMark</var></dfn>
(default 0), and an optional algorithm <dfn export
for="TransformStream/set up"><var>readableSizeAlgorithm</var></dfn>, perform the following steps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have separate set up algorithms for sync vs. async transforms, and thus take away the responsibility of picking the right high water mark from the other spec author. So far other specs haven't needed these extra customization points and I think it might be simpler not to give them out.

(On the other hand, the fact that existing byte-stream-transform specs are calculating the size of all chunks as 1 is a bit unfortunate...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I remove the HWM and size algorithm arguments from "set up a ReadableStream" and "set up a WritableStream" too then?

(On the other hand, the fact that existing byte-stream-transform specs are calculating the size of all chunks as 1 is a bit unfortunate...)

Hmm, true. Once we have proper byte support for WritableStream and TransformStream, we'll have "set up with byte reading support" versions for those too, but for now we're in a bit of an awkward position. 😕

Maybe we can already provide such algorithms, even though they won't vend BYOB readers or BYOB writers yet?

|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
If given, |writableSizeAlgorithm| and |readableSizeAlgorithm| must be algorithms accepting
[=chunk=] objects and returning a number; and if given, |writableHighWaterMark| and
|readableHighWaterMark| must be non-negative, non-NaN numbers.

1. Let |writableHighWaterMark| be 1.
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
1. Let |readableHighWaterMark| be 0.
1. Let |readableSizeAlgorithm| be an algorithm that returns 1.
1. If |writableSizeAlgorithm| was not given, let |writableSizeAlgorithm| be an algorithm that
returns 1.
1. If |readableSizeAlgorithm| was not given, let |readableSizeAlgorithm| be an algorithm that
returns 1.
1. Let |transformAlgorithmWrapper| be an algorithm that runs these steps given a value |chunk|:
1. Let |result| be the result of running |transformAlgorithm| given |chunk|. If this throws an
exception |e|, return [=a promise rejected with=] |e|.
Expand Down Expand Up @@ -7025,9 +7082,11 @@ reason.
an identity {{TransformStream}}</dfn>:

1. Let |transformStream| be a [=new=] {{TransformStream}}.
1. Let |transformAlgorithm| be an algorithm which, given |chunk|, [=TransformStream/enqueues=]
|chunk| in |transformStream|.
1. [=TransformStream/Set up=] |transformStream| with <var
ignore>[=TransformStream/set up/transformAlgorithm=]</var> set to an algorithm which, given
|chunk|, [=TransformStream/enqueues=] |chunk| in |transformStream|.
ignore>[=TransformStream/set up/transformAlgorithm=]</var> set to |transformAlgorithm| and <var
ignore>[=TransformStream/set up/writableHighWaterMark=]</var> set to 0.
1. Return |transformStream|.
</div>

Expand All @@ -7036,7 +7095,7 @@ reason.
The following algorithms must only be used on {{TransformStream}} instances initialized via the
above [=TransformStream/set up=] algorithm. Usually they are called as part of
<var>[=TransformStream/set up/transformAlgorithm=]</var> or
<var>[=TransformStream/set up/flushAlgorithm=]</var>.
<var ignore>[=TransformStream/set up/flushAlgorithm=]</var>.

<p algorithm>To <dfn export for="TransformStream">enqueue</dfn> the JavaScript value |chunk| into a
{{TransformStream}} |stream|, perform !
Expand Down Expand Up @@ -7079,8 +7138,7 @@ Including the {{GenericTransformStream}} mixin will give an IDL interface the ap
the behavior of the resulting interface, its constructor (or other initialization code) must set
each instance's [=GenericTransformStream/transform=] to a [=new=] {{TransformStream}}, and then
[=TransformStream/set up|set it up=] with appropriate customizations via the
<var>[=TransformStream/set up/transformAlgorithm=]</var> and optionally
<var>[=TransformStream/set up/flushAlgorithm=]</var> arguments.
<var>[=TransformStream/set up/transformAlgorithm=]</var> and any optional arguments.

Note: Existing examples of this pattern on the web platform include {{CompressionStream}} and
{{TextDecoderStream}}. [[COMPRESSION]] [[ENCODING]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ exports.implementation = class WritableStreamDefaultControllerImpl {
aos.WritableStreamDefaultControllerError(this, e);
}

releaseBackpressure() {
const state = this._stream._state;
if (state !== 'writable') {
return;
}
aos.WritableStreamDefaultControllerReleaseBackpressure(this);
}

[AbortSteps](reason) {
const result = this._abortAlgorithm(reason);
aos.WritableStreamDefaultControllerClearAlgorithms(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
interface WritableStreamDefaultController {
readonly attribute AbortSignal signal;
undefined error(optional any e);
undefined releaseBackpressure();
};
Loading