Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add connection pool to configure connection lifecycle. #289

Merged
merged 16 commits into from
Dec 13, 2024
Merged
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ maven.install(
"org.postgresql:postgresql:42.7.0",
"org.postgresql:r2dbc-postgresql:1.0.4.RELEASE",
"io.r2dbc:r2dbc-spi:1.0.0.RELEASE",
"io.r2dbc:r2dbc-pool:1.0.0.RELEASE",

# Liquibase.
"org.yaml:snakeyaml:2.2",
Expand Down
6 changes: 6 additions & 0 deletions imports/java/io/r2dbc/pool/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package(default_visibility = ["//visibility:public"])

alias(
name = "pool",
actual = "@maven//:io_r2dbc_r2dbc_pool",
)
43 changes: 41 additions & 2 deletions maven_install.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL",
"__INPUT_ARTIFACTS_HASH": 1541805035,
"__RESOLVED_ARTIFACTS_HASH": 81502917,
"__INPUT_ARTIFACTS_HASH": 838604840,
"__RESOLVED_ARTIFACTS_HASH": -1724946481,
"artifacts": {
"com.adobe.testing:s3mock": {
"shasums": {
Expand Down Expand Up @@ -1296,6 +1296,13 @@
},
"version": "0.27.0"
},
"io.projectreactor.addons:reactor-pool": {
"shasums": {
"jar": "1ef7e46c3ab451684488879ffca6c2bb0039e29f02af60d780fe7d481bdffea1",
"sources": "e7ace37b6bf9cf1d5b11e1d0be1e92f75d278b594a691a1e36613391f3b646a1"
},
"version": "1.0.8"
},
"io.projectreactor.netty:reactor-netty-core": {
"shasums": {
"jar": "9dd0c5ac8216586a3cc41a7c4ff6b1482e3f5ede61798c2d9b6a13c34c199994",
Expand All @@ -1310,6 +1317,13 @@
},
"version": "3.6.9"
},
"io.r2dbc:r2dbc-pool": {
"shasums": {
"jar": "be1ad2fd95629c70dfcf384be1eb4bfc1723d90d6a0d2e4227d02f3b7ab0885a",
"sources": "aceb2ace85124594de15c367e7dc769586e02cc169e546cff4dc623b9831a060"
},
"version": "1.0.0.RELEASE"
},
"io.r2dbc:r2dbc-spi": {
"shasums": {
"jar": "a5846c59fea336431a4ae72ca14edbf5299b78486fa308eafb383f4ae0ea74e5",
Expand Down Expand Up @@ -3494,6 +3508,9 @@
"io.opentelemetry:opentelemetry-api-incubator",
"io.opentelemetry:opentelemetry-sdk"
],
"io.projectreactor.addons:reactor-pool": [
"io.projectreactor:reactor-core"
],
"io.projectreactor.netty:reactor-netty-core": [
"io.netty:netty-handler",
"io.netty:netty-handler-proxy",
Expand All @@ -3505,6 +3522,11 @@
"io.projectreactor:reactor-core": [
"org.reactivestreams:reactive-streams"
],
"io.r2dbc:r2dbc-pool": [
"io.projectreactor.addons:reactor-pool",
"io.projectreactor:reactor-core",
"io.r2dbc:r2dbc-spi"
],
"io.r2dbc:r2dbc-spi": [
"org.reactivestreams:reactive-streams"
],
Expand Down Expand Up @@ -5933,6 +5955,11 @@
"io.perfmark:perfmark-api": [
"io.perfmark"
],
"io.projectreactor.addons:reactor-pool": [
"reactor.pool",
"reactor.pool.decorators",
"reactor.pool.introspection"
],
"io.projectreactor.netty:reactor-netty-core": [
"reactor.netty",
"reactor.netty.channel",
Expand Down Expand Up @@ -5961,6 +5988,9 @@
"reactor.util.function",
"reactor.util.retry"
],
"io.r2dbc:r2dbc-pool": [
"io.r2dbc.pool"
],
"io.r2dbc:r2dbc-spi": [
"io.r2dbc.spi"
],
Expand Down Expand Up @@ -9099,10 +9129,14 @@
"io.opentelemetry:opentelemetry-sdk:jar:sources",
"io.perfmark:perfmark-api",
"io.perfmark:perfmark-api:jar:sources",
"io.projectreactor.addons:reactor-pool",
"io.projectreactor.addons:reactor-pool:jar:sources",
"io.projectreactor.netty:reactor-netty-core",
"io.projectreactor.netty:reactor-netty-core:jar:sources",
"io.projectreactor:reactor-core",
"io.projectreactor:reactor-core:jar:sources",
"io.r2dbc:r2dbc-pool",
"io.r2dbc:r2dbc-pool:jar:sources",
"io.r2dbc:r2dbc-spi",
"io.r2dbc:r2dbc-spi:jar:sources",
"jakarta.activation:jakarta.activation-api",
Expand Down Expand Up @@ -9730,6 +9764,11 @@
"reactor.core.scheduler.ReactorBlockHoundIntegration"
]
},
"io.r2dbc:r2dbc-pool": {
"io.r2dbc.spi.ConnectionFactoryProvider": [
"io.r2dbc.pool.PoolingConnectionFactoryProvider"
]
},
"org.apache.tomcat.embed:tomcat-embed-el": {
"jakarta.el.ExpressionFactory": [
"org.apache.el.ExpressionFactoryImpl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ kt_jvm_library(
srcs = ["PostgresConnectionFactories.kt"],
deps = [
"//imports/java/com/google/cloud/sql:r2dbc-core",
"//imports/java/io/r2dbc/pool",
"//imports/java/org/postgresql:r2dbc",
"//src/main/kotlin/org/wfanet/measurement/gcloud/postgres:flags",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,37 @@
package org.wfanet.measurement.gcloud.postgres

import com.google.cloud.sql.core.GcpConnectionFactoryProvider
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions

object PostgresConnectionFactories {
@JvmStatic
fun buildConnectionFactory(flags: PostgresFlags): ConnectionFactory {
return ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "gcp")
.option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
.option(ConnectionFactoryOptions.USER, flags.user)
// a non-empty password is required, but the value doesn't matter
.option(ConnectionFactoryOptions.PASSWORD, "UNUSED")
.option(ConnectionFactoryOptions.DATABASE, flags.database)
.option(ConnectionFactoryOptions.HOST, flags.cloudSqlInstance)
.option(GcpConnectionFactoryProvider.ENABLE_IAM_AUTH, true)
val connectionFactory =
ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "gcp")
.option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
.option(ConnectionFactoryOptions.USER, flags.user)
// a non-empty password is required, but the value doesn't matter
.option(ConnectionFactoryOptions.PASSWORD, "UNUSED")
.option(ConnectionFactoryOptions.DATABASE, flags.database)
.option(ConnectionFactoryOptions.HOST, flags.cloudSqlInstance)
.option(ConnectionFactoryOptions.STATEMENT_TIMEOUT, flags.statementTimeout)
.option(GcpConnectionFactoryProvider.ENABLE_IAM_AUTH, true)
.build()
)

val configuration: ConnectionPoolConfiguration =
ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(flags.maxIdleTime)
.maxSize(flags.maxPoolSize)
.acquireRetry(flags.acquireRetry)
.build()
)

return ConnectionPool(configuration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,66 @@

package org.wfanet.measurement.gcloud.postgres

import java.time.Duration
import picocli.CommandLine

/** Common command-line flags for connecting to a single Postgres database. */
class PostgresFlags {
@CommandLine.Option(
names = ["--postgres-database"],
description = ["Name of the Postgres database."],
required = true
required = true,
)
lateinit var database: String
private set

@CommandLine.Option(
names = ["--postgres-cloud-sql-connection-name"],
description = ["Instance connection name of the Postgres database."],
required = true
required = true,
)
lateinit var cloudSqlInstance: String
private set

@CommandLine.Option(
names = ["--postgres-user"],
description = ["User of the Postgres database."],
required = true
required = true,
)
lateinit var user: String
private set

@CommandLine.Option(
names = ["--statement-timeout"],
description = ["Statement timeout duration."],
required = false,
defaultValue = "120s",
)
lateinit var statementTimeout: Duration
private set

@CommandLine.Option(
names = ["--max-idle-time"],
description = ["Maximum duration a connection can be idle before being closed."],
required = false,
defaultValue = "5m",
)
lateinit var maxIdleTime: Duration
private set

@CommandLine.Option(
names = ["--max-connection-pool-size"],
description = ["Maximum number of connections in pool."],
required = false,
)
var maxPoolSize: Int = 16
private set

@CommandLine.Option(
names = ["--acquire-retry"],
description = ["Maximum number of retries when acquiring a connection from the pool."],
required = false,
)
var acquireRetry: Int = 10
private set
}