From ba46c971fa83ba0c68203edb631aa3185f0041f5 Mon Sep 17 00:00:00 2001 From: "http4s-steward[bot]" <106843772+http4s-steward[bot]@users.noreply.github.com> Date: Fri, 12 May 2023 22:24:07 +0000 Subject: [PATCH 01/11] Update http4s-circe, http4s-client, ... to 0.23.19 in series/0.23 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 7c31d54d0..52204a95a 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import Dependencies._ val Scala212 = "2.12.17" val Scala213 = "2.13.10" val Scala3 = "3.2.2" -val http4sVersion = "0.23.18" +val http4sVersion = "0.23.19" val munitCatsEffectVersion = "2.0.0-M3" ThisBuild / resolvers += From b9f1979a17abe9ea1d92af53d61b553266374f8a Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 21:16:19 -0400 Subject: [PATCH 02/11] Fix fs2 deprecations --- .../scala/com/example/http4s/blaze/demo/server/Module.scala | 4 +++- .../scala/com/example/http4s/blaze/demo/server/Server.scala | 4 +++- .../http4s/blaze/demo/server/service/FileService.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/com/example/http4s/blaze/demo/server/Module.scala b/examples/src/main/scala/com/example/http4s/blaze/demo/server/Module.scala index 6fce19b91..54afba165 100644 --- a/examples/src/main/scala/com/example/http4s/blaze/demo/server/Module.scala +++ b/examples/src/main/scala/com/example/http4s/blaze/demo/server/Module.scala @@ -24,6 +24,8 @@ import com.example.http4s.blaze.demo.server.endpoints.auth.BasicAuthHttpEndpoint import com.example.http4s.blaze.demo.server.endpoints.auth.GitHubHttpEndpoint import com.example.http4s.blaze.demo.server.service.FileService import com.example.http4s.blaze.demo.server.service.GitHubService +import fs2.compression.Compression +import fs2.io.file.Files import org.http4s.HttpRoutes import org.http4s.client.Client import org.http4s.server.HttpMiddleware @@ -34,7 +36,7 @@ import org.http4s.server.middleware.Timeout import scala.concurrent.duration._ -class Module[F[_]: Async](client: Client[F]) { +class Module[F[_]: Async: Compression: Files](client: Client[F]) { private val fileService = new FileService[F] private val gitHubService = new GitHubService[F](client) diff --git a/examples/src/main/scala/com/example/http4s/blaze/demo/server/Server.scala b/examples/src/main/scala/com/example/http4s/blaze/demo/server/Server.scala index 5790991c4..4ee4fd490 100644 --- a/examples/src/main/scala/com/example/http4s/blaze/demo/server/Server.scala +++ b/examples/src/main/scala/com/example/http4s/blaze/demo/server/Server.scala @@ -18,6 +18,8 @@ package com.example.http4s.blaze.demo.server import cats.effect._ import fs2.Stream +import fs2.compression.Compression +import fs2.io.file.Files import org.http4s.HttpApp import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.blaze.server.BlazeServerBuilder @@ -37,7 +39,7 @@ object HttpServer { "/" -> ctx.httpServices ).orNotFound - def stream[F[_]: Async]: Stream[F, ExitCode] = + def stream[F[_]: Async: Compression: Files]: Stream[F, ExitCode] = for { client <- BlazeClientBuilder[F].stream ctx <- Stream(new Module[F](client)) diff --git a/examples/src/main/scala/com/example/http4s/blaze/demo/server/service/FileService.scala b/examples/src/main/scala/com/example/http4s/blaze/demo/server/service/FileService.scala index 4d0035101..a11458902 100644 --- a/examples/src/main/scala/com/example/http4s/blaze/demo/server/service/FileService.scala +++ b/examples/src/main/scala/com/example/http4s/blaze/demo/server/service/FileService.scala @@ -26,7 +26,7 @@ import org.http4s.multipart.Part import java.io.File import java.nio.file.Paths -class FileService[F[_]](implicit F: Async[F], S: StreamUtils[F]) { +class FileService[F[_]](implicit F: Async[F], S: StreamUtils[F], files: Files[F]) { def homeDirectories(depth: Option[Int]): Stream[F, String] = S.env("HOME").flatMap { maybePath => val ifEmpty = S.error("HOME environment variable not found!") From 76aa3b528a85b5b9ebbc0888268e5f9076c63571 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:21:16 -0400 Subject: [PATCH 03/11] Fix cancellation in PoolManager for cats-effect-3.5.0 --- .../org/http4s/blaze/client/PoolManager.scala | 124 +++++++++--------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala index d2615abac..3c4d4f058 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala @@ -182,6 +182,8 @@ private final class PoolManager[F[_], A <: Connection[F]]( idleQueues.update(key, q) } + private[this] val noopCancelToken = Some(F.unit) + /** This generates a effect of Next Connection. The following calls are executed asynchronously * with respect to whenever the execution of this task can occur. * @@ -203,75 +205,77 @@ private final class PoolManager[F[_], A <: Connection[F]]( */ def borrow(key: RequestKey): F[NextConnection] = F.async { callback => - semaphore.permit.use { _ => - if (!isClosed) { - def go(): F[Unit] = - getConnectionFromQueue(key).flatMap { - case Some(pooled) if pooled.conn.isClosed => - F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *> - decrConnection(key) *> - go() - - case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) => - F.delay( - logger.debug(s"Shutting down and evicting expired connection for $key: $stats") - ) *> - decrConnection(key) *> - F.delay(pooled.conn.shutdown()) *> - go() - - case Some(pooled) => - F.delay(logger.debug(s"Recycling connection for $key: $stats")) *> - F.delay(callback(Right(NextConnection(pooled.conn, fresh = false)))) - - case None if numConnectionsCheckHolds(key) => - F.delay( - logger.debug(s"Active connection not found for $key. Creating new one. $stats") - ) *> - createConnection(key, callback) - - case None if maxConnectionsPerRequestKey(key) <= 0 => - F.delay(callback(Left(NoConnectionAllowedException(key)))) - - case None if curTotal == maxTotal => - val keys = idleQueues.keys - if (keys.nonEmpty) + semaphore.permit + .use { _ => + if (!isClosed) { + def go(): F[Unit] = + getConnectionFromQueue(key).flatMap { + case Some(pooled) if pooled.conn.isClosed => + F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *> + decrConnection(key) *> + go() + + case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) => F.delay( - logger.debug( - s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats" - ) + logger.debug(s"Shutting down and evicting expired connection for $key: $stats") + ) *> + decrConnection(key) *> + F.delay(pooled.conn.shutdown()) *> + go() + + case Some(pooled) => + F.delay(logger.debug(s"Recycling connection for $key: $stats")) *> + F.delay(callback(Right(NextConnection(pooled.conn, fresh = false)))) + + case None if numConnectionsCheckHolds(key) => + F.delay( + logger.debug(s"Active connection not found for $key. Creating new one. $stats") ) *> - F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap { - randKey => - getConnectionFromQueue(randKey).map( - _.fold( - logger.warn(s"No connection to evict from the idleQueue for $randKey") - )(_.conn.shutdown()) - ) *> - decrConnection(randKey) - } *> createConnection(key, callback) - else + + case None if maxConnectionsPerRequestKey(key) <= 0 => + F.delay(callback(Left(NoConnectionAllowedException(key)))) + + case None if curTotal == maxTotal => + val keys = idleQueues.keys + if (keys.nonEmpty) + F.delay( + logger.debug( + s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats" + ) + ) *> + F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap { + randKey => + getConnectionFromQueue(randKey).map( + _.fold( + logger.warn(s"No connection to evict from the idleQueue for $randKey") + )(_.conn.shutdown()) + ) *> + decrConnection(randKey) + } *> + createConnection(key, callback) + else + F.delay( + logger.debug( + s"No connections available for the desired key, $key. Adding to waitQueue: $stats" + ) + ) *> + addToWaitQueue(key, callback) + + case None => // we're full up. Add to waiting queue. F.delay( logger.debug( - s"No connections available for the desired key, $key. Adding to waitQueue: $stats" + s"No connections available for $key. Waiting on new connection: $stats" ) ) *> addToWaitQueue(key, callback) + } - case None => // we're full up. Add to waiting queue. - F.delay( - logger.debug( - s"No connections available for $key. Waiting on new connection: $stats" - ) - ) *> - addToWaitQueue(key, callback) - } - - F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go()).as(None) - } else - F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))).as(None) - } + F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go()) + } else + F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))) + } + .as(noopCancelToken) } private def releaseRecyclable(key: RequestKey, connection: A): F[Unit] = From 0ca5a3ea9d336635308128a2198aa005e9b37935 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:18:59 -0400 Subject: [PATCH 04/11] Fix cancellation in Http4sWSStage for cats-effect-3.5.0 --- .../blazecore/websocket/Http4sWSStage.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/blaze-core/src/main/scala/org/http4s/blazecore/websocket/Http4sWSStage.scala b/blaze-core/src/main/scala/org/http4s/blazecore/websocket/Http4sWSStage.scala index bca255526..8cf640b43 100644 --- a/blaze-core/src/main/scala/org/http4s/blazecore/websocket/Http4sWSStage.scala +++ b/blaze-core/src/main/scala/org/http4s/blazecore/websocket/Http4sWSStage.scala @@ -66,23 +66,29 @@ private[http4s] class Http4sWSStage[F[_]]( def snkFun(frame: WebSocketFrame): F[Unit] = isClosed.ifM(F.unit, evalFrame(frame)) + private[this] val shutdownCancelToken = Some(F.delay(stageShutdown())) + private[this] def writeFrame(frame: WebSocketFrame, ec: ExecutionContext): F[Unit] = - writeSemaphore.permit.use { _ => - F.async_[Unit] { cb => - channelWrite(frame).onComplete { - case Success(res) => cb(Right(res)) - case Failure(t) => cb(Left(t)) - }(ec) - } - } + writeSemaphore.permit.surround( + F.async[Unit](cb => + F.delay( + channelWrite(frame).onComplete { + case Success(res) => cb(Right(res)) + case Failure(t) => cb(Left(t)) + }(ec) + ).as(shutdownCancelToken) + ) + ) private[this] def readFrameTrampoline: F[WebSocketFrame] = - F.async_[WebSocketFrame] { cb => - channelRead().onComplete { - case Success(ws) => cb(Right(ws)) - case Failure(exception) => cb(Left(exception)) - }(trampoline) - } + F.async[WebSocketFrame](cb => + F.delay( + channelRead().onComplete { + case Success(ws) => cb(Right(ws)) + case Failure(exception) => cb(Left(exception)) + }(trampoline) + ).as(shutdownCancelToken) + ) /** Read from our websocket. * From 36aee9833e829ad0eae18405d1551a286c3b84ad Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:16:18 -0400 Subject: [PATCH 05/11] Fix cancellation in Http1Connection for cats-effect-3.5.0 --- .../http4s/blaze/client/Http1Connection.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala index 3ea0634e5..9495ef9aa 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala @@ -179,6 +179,8 @@ private final class Http1Connection[F[_]]( override protected def contentComplete(): Boolean = parser.contentComplete() + private[this] val noopCancel = Some(F.unit) + private def executeRequest( req: Request[F], cancellation: F[TimeoutException], @@ -214,7 +216,10 @@ private final class Http1Connection[F[_]]( } val idleTimeoutF: F[TimeoutException] = idleTimeoutStage match { - case Some(stage) => F.async_[TimeoutException](stage.setTimeout) + case Some(stage) => + F.async[TimeoutException] { cb => + F.delay(stage.setTimeout(cb)).as(noopCancel) + } case None => F.never[TimeoutException] } @@ -254,15 +259,20 @@ private final class Http1Connection[F[_]]( } } + private[this] val shutdownCancelToken = Some(F.delay(stageShutdown())) + private def receiveResponse( closeOnFinish: Boolean, doesntHaveBody: Boolean, idleTimeoutS: F[Either[Throwable, Unit]], idleRead: Option[Future[ByteBuffer]], ): F[Response[F]] = - F.async_[Response[F]] { cb => - val read = idleRead.getOrElse(channelRead()) - handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS) + F.async[Response[F]] { cb => + F.delay { + val read = idleRead.getOrElse(channelRead()) + handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS) + shutdownCancelToken + } } // this method will get some data, and try to continue parsing using the implicit ec From 97d1611dbe8bdfa74ff0c15b224fa2ad318fa9ec Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:26:42 -0400 Subject: [PATCH 06/11] Fix cancellation in Http1Stage for cats-effect-3.5.0 --- .../org/http4s/blazecore/Http1Stage.scala | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/blaze-core/src/main/scala/org/http4s/blazecore/Http1Stage.scala b/blaze-core/src/main/scala/org/http4s/blazecore/Http1Stage.scala index 4452dab1a..bd779f527 100644 --- a/blaze-core/src/main/scala/org/http4s/blazecore/Http1Stage.scala +++ b/blaze-core/src/main/scala/org/http4s/blazecore/Http1Stage.scala @@ -197,6 +197,8 @@ private[http4s] trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] => // we are not finished and need more data. else streamingBody(buffer, eofCondition) + private[this] val shutdownCancelToken = Some(F.delay(stageShutdown())) + // Streams the body off the wire private def streamingBody( buffer: ByteBuffer, @@ -205,46 +207,48 @@ private[http4s] trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] => @volatile var currentBuffer = buffer // TODO: we need to work trailers into here somehow - val t = F.async_[Option[Chunk[Byte]]] { cb => - if (!contentComplete()) { - def go(): Unit = - try { - val parseResult = doParseContent(currentBuffer) - logger.debug(s"Parse result: $parseResult, content complete: ${contentComplete()}") - parseResult match { - case Some(result) => - cb(Either.right(Chunk.byteBuffer(result).some)) - - case None if contentComplete() => - cb(End) - - case None => - channelRead().onComplete { - case Success(b) => - currentBuffer = BufferTools.concatBuffers(currentBuffer, b) - go() - - case Failure(Command.EOF) => - cb(eofCondition()) - - case Failure(t) => - logger.error(t)("Unexpected error reading body.") - cb(Either.left(t)) - } + val t = F.async[Option[Chunk[Byte]]] { cb => + F.delay { + if (!contentComplete()) { + def go(): Unit = + try { + val parseResult = doParseContent(currentBuffer) + logger.debug(s"Parse result: $parseResult, content complete: ${contentComplete()}") + parseResult match { + case Some(result) => + cb(Either.right(Chunk.byteBuffer(result).some)) + + case None if contentComplete() => + cb(End) + + case None => + channelRead().onComplete { + case Success(b) => + currentBuffer = BufferTools.concatBuffers(currentBuffer, b) + go() + + case Failure(Command.EOF) => + cb(eofCondition()) + + case Failure(t) => + logger.error(t)("Unexpected error reading body.") + cb(Either.left(t)) + } + } + } catch { + case t: ParserException => + fatalError(t, "Error parsing request body") + cb(Either.left(InvalidBodyException(t.getMessage()))) + + case t: Throwable => + fatalError(t, "Error collecting body") + cb(Either.left(t)) } - } catch { - case t: ParserException => - fatalError(t, "Error parsing request body") - cb(Either.left(InvalidBodyException(t.getMessage()))) - - case t: Throwable => - fatalError(t, "Error collecting body") - cb(Either.left(t)) - } - go() - } else cb(End) + go() + } else cb(End) + shutdownCancelToken + } } - (repeatEval(t).unNoneTerminate.flatMap(chunk(_)), () => drainBody(currentBuffer)) } From e6600611f9ee2aafd6947a456bf5e2c2296703dc Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:27:27 -0400 Subject: [PATCH 07/11] Comment on fromFutureNoShift's uncancelability --- .../src/main/scala/org/http4s/blazecore/util/package.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/blaze-core/src/main/scala/org/http4s/blazecore/util/package.scala b/blaze-core/src/main/scala/org/http4s/blazecore/util/package.scala index 694a3bd8f..ac3dbbb03 100644 --- a/blaze-core/src/main/scala/org/http4s/blazecore/util/package.scala +++ b/blaze-core/src/main/scala/org/http4s/blazecore/util/package.scala @@ -42,6 +42,8 @@ package object util extends ParasiticExecutionContextCompat { case Some(value) => F.fromTry(value) case None => + // Scala futures are uncancelable. There's not much we can + // do here other than async_. F.async_ { cb => future.onComplete { case Success(a) => cb(Right(a)) From efb5efb521beacb810ec4093a9497adde2825b12 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 22:30:13 -0400 Subject: [PATCH 08/11] Fix cancellation in Http2NodeStage for cats-effect-3.5.0 --- .../main/scala/org/http4s/blaze/server/Http2NodeStage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/blaze-server/src/main/scala/org/http4s/blaze/server/Http2NodeStage.scala b/blaze-server/src/main/scala/org/http4s/blaze/server/Http2NodeStage.scala index a55a621a9..37ae4e6bb 100644 --- a/blaze-server/src/main/scala/org/http4s/blaze/server/Http2NodeStage.scala +++ b/blaze-server/src/main/scala/org/http4s/blaze/server/Http2NodeStage.scala @@ -105,6 +105,8 @@ private class Http2NodeStage[F[_]]( closePipeline(Some(e)) } + private[this] val shutdownStageToken = Some(F.delay(stageShutdown())) + /** collect the body: a maxlen < 0 is interpreted as undefined */ private def getBody(maxlen: Long): EntityBody[F] = { var complete = false @@ -156,7 +158,7 @@ private class Http2NodeStage[F[_]]( closePipeline(Some(e)) } - None + shutdownStageToken } } From ca1c5c2fd88557c2597e7b0f353f0705cb10feed Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 23:38:26 -0400 Subject: [PATCH 09/11] use -> surround Co-authored-by: Arman Bilge --- .../src/main/scala/org/http4s/blaze/client/PoolManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala index 3c4d4f058..cb4d65395 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala @@ -206,7 +206,7 @@ private final class PoolManager[F[_], A <: Connection[F]]( def borrow(key: RequestKey): F[NextConnection] = F.async { callback => semaphore.permit - .use { _ => + .surround { if (!isClosed) { def go(): F[Unit] = getConnectionFromQueue(key).flatMap { From 1bac474e8219439476c10aa5233132b4a741ba62 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 23:43:41 -0400 Subject: [PATCH 10/11] Cancel timeout on idleTimeoutStage if we couldn't set a timeout --- .../main/scala/org/http4s/blaze/client/Http1Connection.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala index 9495ef9aa..893e5e424 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/Http1Connection.scala @@ -179,8 +179,6 @@ private final class Http1Connection[F[_]]( override protected def contentComplete(): Boolean = parser.contentComplete() - private[this] val noopCancel = Some(F.unit) - private def executeRequest( req: Request[F], cancellation: F[TimeoutException], @@ -218,7 +216,7 @@ private final class Http1Connection[F[_]]( val idleTimeoutF: F[TimeoutException] = idleTimeoutStage match { case Some(stage) => F.async[TimeoutException] { cb => - F.delay(stage.setTimeout(cb)).as(noopCancel) + F.delay(stage.setTimeout(cb)).as(Some(F.delay(stage.cancelTimeout()))) } case None => F.never[TimeoutException] } From 796d9593e7067763c786741b7c7956fbada17b8a Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Sat, 13 May 2023 23:47:14 -0400 Subject: [PATCH 11/11] Fix typo in test name --- .../scala/org/http4s/blaze/server/Http1ServerStageSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blaze-server/src/test/scala/org/http4s/blaze/server/Http1ServerStageSpec.scala b/blaze-server/src/test/scala/org/http4s/blaze/server/Http1ServerStageSpec.scala index 90473d5aa..de0a2500e 100644 --- a/blaze-server/src/test/scala/org/http4s/blaze/server/Http1ServerStageSpec.scala +++ b/blaze-server/src/test/scala/org/http4s/blaze/server/Http1ServerStageSpec.scala @@ -537,7 +537,7 @@ class Http1ServerStageSpec extends CatsEffectSuite { } } - fixture.test("Http1ServerStage: routes should cancels on stage shutdown".flaky) { tw => + fixture.test("Http1ServerStage: routes should cancel on stage shutdown".flaky) { tw => Deferred[IO, Unit] .flatMap { canceled => Deferred[IO, Unit].flatMap { gate =>