Skip to content

Commit

Permalink
feat: enable tcp keepalive for http server (GreptimeTeam#4019)
Browse files Browse the repository at this point in the history
* feat: enable tcp keepalive for http server

* chore: for enterprise's update

* resolve PR comments
  • Loading branch information
MichaelScofield authored May 27, 2024
1 parent 1de17ae commit 2971052
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl StartCommand {
}

// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
pub fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: StandaloneOptions,
Expand Down
17 changes: 16 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,19 @@ pub enum Error {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},

#[snafu(display(
"Failed to get region metadata from engine {} for region_id {}",
engine,
region_id,
))]
GetRegionMetadata {
engine: String,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -433,7 +446,9 @@ impl ErrorExt for Error {
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
StatusCode::Unsupported
}
HandleRegionRequest { source, .. } => source.status_code(),
HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } => {
source.status_code()
}
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ macro_rules! add_service {
let max_recv_message_size = $builder.config().max_recv_message_size;
let max_send_message_size = $builder.config().max_send_message_size;

use tonic::codec::CompressionEncoding;
let service_builder = $service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size)
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,13 @@ impl Server for HttpServer {
let app = self.build(app);
let server = axum::Server::bind(&listening)
.tcp_nodelay(true)
// Enable TCP keepalive to close the dangling established connections.
// It's configured to let the keepalive probes first send after the connection sits
// idle for 59 minutes, and then send every 10 seconds for 6 times.
// So the connection will be closed after roughly 1 hour.
.tcp_keepalive(Some(Duration::from_secs(59 * 60)))
.tcp_keepalive_interval(Some(Duration::from_secs(10)))
.tcp_keepalive_retries(Some(6))
.serve(app.into_make_service());

*shutdown_tx = Some(tx);
Expand Down
25 changes: 25 additions & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use common_base::secrets::ExposeSecret;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::OutputData;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::recordbatch::{check_output_stream, ExpectedOutput};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{
Expand All @@ -54,6 +56,7 @@ use servers::tls::ReloadableTlsServerConfig;
use servers::Mode;
use session::context::QueryContext;

use crate::database::Database;
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};

pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
Expand Down Expand Up @@ -687,3 +690,25 @@ where

test(endpoints).await
}

pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
let output = output.data;

match (&output, expected) {
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(
*x, y,
r#"
expected: {y}
actual: {x}
"#
)
}
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}

0 comments on commit 2971052

Please sign in to comment.