Skip to content

Commit

Permalink
Merge pull request #806 from http4s/http4s-0.23.19
Browse files Browse the repository at this point in the history
Upgrade to http4s-0.23.19
  • Loading branch information
rossabaker authored May 14, 2023
2 parents c308b23 + 796d959 commit 33d4bf0
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ private final class Http1Connection[F[_]](
}

val idleTimeoutF: F[TimeoutException] = idleTimeoutStage match {
case Some(stage) => F.async_[TimeoutException](stage.setTimeout)
case Some(stage) =>
F.async[TimeoutException] { cb =>
F.delay(stage.setTimeout(cb)).as(Some(F.delay(stage.cancelTimeout())))
}
case None => F.never[TimeoutException]
}

Expand Down Expand Up @@ -254,15 +257,20 @@ private final class Http1Connection[F[_]](
}
}

private[this] val shutdownCancelToken = Some(F.delay(stageShutdown()))

private def receiveResponse(
closeOnFinish: Boolean,
doesntHaveBody: Boolean,
idleTimeoutS: F[Either[Throwable, Unit]],
idleRead: Option[Future[ByteBuffer]],
): F[Response[F]] =
F.async_[Response[F]] { cb =>
val read = idleRead.getOrElse(channelRead())
handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS)
F.async[Response[F]] { cb =>
F.delay {
val read = idleRead.getOrElse(channelRead())
handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS)
shutdownCancelToken
}
}

// this method will get some data, and try to continue parsing using the implicit ec
Expand Down
124 changes: 64 additions & 60 deletions blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ private final class PoolManager[F[_], A <: Connection[F]](
idleQueues.update(key, q)
}

private[this] val noopCancelToken = Some(F.unit)

/** This generates a effect of Next Connection. The following calls are executed asynchronously
* with respect to whenever the execution of this task can occur.
*
Expand All @@ -203,75 +205,77 @@ private final class PoolManager[F[_], A <: Connection[F]](
*/
def borrow(key: RequestKey): F[NextConnection] =
F.async { callback =>
semaphore.permit.use { _ =>
if (!isClosed) {
def go(): F[Unit] =
getConnectionFromQueue(key).flatMap {
case Some(pooled) if pooled.conn.isClosed =>
F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *>
decrConnection(key) *>
go()

case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) =>
F.delay(
logger.debug(s"Shutting down and evicting expired connection for $key: $stats")
) *>
decrConnection(key) *>
F.delay(pooled.conn.shutdown()) *>
go()

case Some(pooled) =>
F.delay(logger.debug(s"Recycling connection for $key: $stats")) *>
F.delay(callback(Right(NextConnection(pooled.conn, fresh = false))))

case None if numConnectionsCheckHolds(key) =>
F.delay(
logger.debug(s"Active connection not found for $key. Creating new one. $stats")
) *>
createConnection(key, callback)

case None if maxConnectionsPerRequestKey(key) <= 0 =>
F.delay(callback(Left(NoConnectionAllowedException(key))))

case None if curTotal == maxTotal =>
val keys = idleQueues.keys
if (keys.nonEmpty)
semaphore.permit
.surround {
if (!isClosed) {
def go(): F[Unit] =
getConnectionFromQueue(key).flatMap {
case Some(pooled) if pooled.conn.isClosed =>
F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *>
decrConnection(key) *>
go()

case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) =>
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats"
)
logger.debug(s"Shutting down and evicting expired connection for $key: $stats")
) *>
decrConnection(key) *>
F.delay(pooled.conn.shutdown()) *>
go()

case Some(pooled) =>
F.delay(logger.debug(s"Recycling connection for $key: $stats")) *>
F.delay(callback(Right(NextConnection(pooled.conn, fresh = false))))

case None if numConnectionsCheckHolds(key) =>
F.delay(
logger.debug(s"Active connection not found for $key. Creating new one. $stats")
) *>
F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap {
randKey =>
getConnectionFromQueue(randKey).map(
_.fold(
logger.warn(s"No connection to evict from the idleQueue for $randKey")
)(_.conn.shutdown())
) *>
decrConnection(randKey)
} *>
createConnection(key, callback)
else

case None if maxConnectionsPerRequestKey(key) <= 0 =>
F.delay(callback(Left(NoConnectionAllowedException(key))))

case None if curTotal == maxTotal =>
val keys = idleQueues.keys
if (keys.nonEmpty)
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats"
)
) *>
F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap {
randKey =>
getConnectionFromQueue(randKey).map(
_.fold(
logger.warn(s"No connection to evict from the idleQueue for $randKey")
)(_.conn.shutdown())
) *>
decrConnection(randKey)
} *>
createConnection(key, callback)
else
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Adding to waitQueue: $stats"
)
) *>
addToWaitQueue(key, callback)

case None => // we're full up. Add to waiting queue.
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Adding to waitQueue: $stats"
s"No connections available for $key. Waiting on new connection: $stats"
)
) *>
addToWaitQueue(key, callback)
}

case None => // we're full up. Add to waiting queue.
F.delay(
logger.debug(
s"No connections available for $key. Waiting on new connection: $stats"
)
) *>
addToWaitQueue(key, callback)
}

F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go()).as(None)
} else
F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))).as(None)
}
F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go())
} else
F.delay(callback(Left(new IllegalStateException("Connection pool is closed"))))
}
.as(noopCancelToken)
}

private def releaseRecyclable(key: RequestKey, connection: A): F[Unit] =
Expand Down
80 changes: 42 additions & 38 deletions blaze-core/src/main/scala/org/http4s/blazecore/Http1Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ private[http4s] trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] =>
// we are not finished and need more data.
else streamingBody(buffer, eofCondition)

private[this] val shutdownCancelToken = Some(F.delay(stageShutdown()))

// Streams the body off the wire
private def streamingBody(
buffer: ByteBuffer,
Expand All @@ -205,46 +207,48 @@ private[http4s] trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] =>
@volatile var currentBuffer = buffer

// TODO: we need to work trailers into here somehow
val t = F.async_[Option[Chunk[Byte]]] { cb =>
if (!contentComplete()) {
def go(): Unit =
try {
val parseResult = doParseContent(currentBuffer)
logger.debug(s"Parse result: $parseResult, content complete: ${contentComplete()}")
parseResult match {
case Some(result) =>
cb(Either.right(Chunk.byteBuffer(result).some))

case None if contentComplete() =>
cb(End)

case None =>
channelRead().onComplete {
case Success(b) =>
currentBuffer = BufferTools.concatBuffers(currentBuffer, b)
go()

case Failure(Command.EOF) =>
cb(eofCondition())

case Failure(t) =>
logger.error(t)("Unexpected error reading body.")
cb(Either.left(t))
}
val t = F.async[Option[Chunk[Byte]]] { cb =>
F.delay {
if (!contentComplete()) {
def go(): Unit =
try {
val parseResult = doParseContent(currentBuffer)
logger.debug(s"Parse result: $parseResult, content complete: ${contentComplete()}")
parseResult match {
case Some(result) =>
cb(Either.right(Chunk.byteBuffer(result).some))

case None if contentComplete() =>
cb(End)

case None =>
channelRead().onComplete {
case Success(b) =>
currentBuffer = BufferTools.concatBuffers(currentBuffer, b)
go()

case Failure(Command.EOF) =>
cb(eofCondition())

case Failure(t) =>
logger.error(t)("Unexpected error reading body.")
cb(Either.left(t))
}
}
} catch {
case t: ParserException =>
fatalError(t, "Error parsing request body")
cb(Either.left(InvalidBodyException(t.getMessage())))

case t: Throwable =>
fatalError(t, "Error collecting body")
cb(Either.left(t))
}
} catch {
case t: ParserException =>
fatalError(t, "Error parsing request body")
cb(Either.left(InvalidBodyException(t.getMessage())))

case t: Throwable =>
fatalError(t, "Error collecting body")
cb(Either.left(t))
}
go()
} else cb(End)
go()
} else cb(End)
shutdownCancelToken
}
}

(repeatEval(t).unNoneTerminate.flatMap(chunk(_)), () => drainBody(currentBuffer))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ package object util extends ParasiticExecutionContextCompat {
case Some(value) =>
F.fromTry(value)
case None =>
// Scala futures are uncancelable. There's not much we can
// do here other than async_.
F.async_ { cb =>
future.onComplete {
case Success(a) => cb(Right(a))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,29 @@ private[http4s] class Http4sWSStage[F[_]](

def snkFun(frame: WebSocketFrame): F[Unit] = isClosed.ifM(F.unit, evalFrame(frame))

private[this] val shutdownCancelToken = Some(F.delay(stageShutdown()))

private[this] def writeFrame(frame: WebSocketFrame, ec: ExecutionContext): F[Unit] =
writeSemaphore.permit.use { _ =>
F.async_[Unit] { cb =>
channelWrite(frame).onComplete {
case Success(res) => cb(Right(res))
case Failure(t) => cb(Left(t))
}(ec)
}
}
writeSemaphore.permit.surround(
F.async[Unit](cb =>
F.delay(
channelWrite(frame).onComplete {
case Success(res) => cb(Right(res))
case Failure(t) => cb(Left(t))
}(ec)
).as(shutdownCancelToken)
)
)

private[this] def readFrameTrampoline: F[WebSocketFrame] =
F.async_[WebSocketFrame] { cb =>
channelRead().onComplete {
case Success(ws) => cb(Right(ws))
case Failure(exception) => cb(Left(exception))
}(trampoline)
}
F.async[WebSocketFrame](cb =>
F.delay(
channelRead().onComplete {
case Success(ws) => cb(Right(ws))
case Failure(exception) => cb(Left(exception))
}(trampoline)
).as(shutdownCancelToken)
)

/** Read from our websocket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private class Http2NodeStage[F[_]](
closePipeline(Some(e))
}

private[this] val shutdownStageToken = Some(F.delay(stageShutdown()))

/** collect the body: a maxlen < 0 is interpreted as undefined */
private def getBody(maxlen: Long): EntityBody[F] = {
var complete = false
Expand Down Expand Up @@ -156,7 +158,7 @@ private class Http2NodeStage[F[_]](
closePipeline(Some(e))
}

None
shutdownStageToken
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ class Http1ServerStageSpec extends CatsEffectSuite {
}
}

fixture.test("Http1ServerStage: routes should cancels on stage shutdown".flaky) { tw =>
fixture.test("Http1ServerStage: routes should cancel on stage shutdown".flaky) { tw =>
Deferred[IO, Unit]
.flatMap { canceled =>
Deferred[IO, Unit].flatMap { gate =>
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Dependencies._
val Scala212 = "2.12.17"
val Scala213 = "2.13.10"
val Scala3 = "3.2.2"
val http4sVersion = "0.23.18"
val http4sVersion = "0.23.19"
val munitCatsEffectVersion = "2.0.0-M3"

ThisBuild / resolvers +=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.example.http4s.blaze.demo.server.endpoints.auth.BasicAuthHttpEndpoint
import com.example.http4s.blaze.demo.server.endpoints.auth.GitHubHttpEndpoint
import com.example.http4s.blaze.demo.server.service.FileService
import com.example.http4s.blaze.demo.server.service.GitHubService
import fs2.compression.Compression
import fs2.io.file.Files
import org.http4s.HttpRoutes
import org.http4s.client.Client
import org.http4s.server.HttpMiddleware
Expand All @@ -34,7 +36,7 @@ import org.http4s.server.middleware.Timeout

import scala.concurrent.duration._

class Module[F[_]: Async](client: Client[F]) {
class Module[F[_]: Async: Compression: Files](client: Client[F]) {
private val fileService = new FileService[F]

private val gitHubService = new GitHubService[F](client)
Expand Down
Loading

0 comments on commit 33d4bf0

Please sign in to comment.