mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
feat: enable tcp keepalive for http server (#4019)
* feat: enable tcp keepalive for http server * chore: for enterprise's update * resolve PR comments
This commit is contained in:
@@ -306,7 +306,7 @@ impl StartCommand {
|
||||
}
|
||||
|
||||
// The precedence order is: cli > config file > environment variables > default values.
|
||||
fn merge_with_cli_options(
|
||||
pub fn merge_with_cli_options(
|
||||
&self,
|
||||
global_options: &GlobalOptions,
|
||||
mut opts: StandaloneOptions,
|
||||
|
||||
@@ -367,6 +367,19 @@ pub enum Error {
|
||||
#[snafu(source(from(common_config::error::Error, Box::new)))]
|
||||
source: Box<common_config::error::Error>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to get region metadata from engine {} for region_id {}",
|
||||
engine,
|
||||
region_id,
|
||||
))]
|
||||
GetRegionMetadata {
|
||||
engine: String,
|
||||
region_id: RegionId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -433,7 +446,9 @@ impl ErrorExt for Error {
|
||||
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
|
||||
StatusCode::Unsupported
|
||||
}
|
||||
HandleRegionRequest { source, .. } => source.status_code(),
|
||||
HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
StopRegionEngine { source, .. } => source.status_code(),
|
||||
|
||||
FindLogicalRegions { source, .. } => source.status_code(),
|
||||
|
||||
@@ -50,6 +50,7 @@ macro_rules! add_service {
|
||||
let max_recv_message_size = $builder.config().max_recv_message_size;
|
||||
let max_send_message_size = $builder.config().max_send_message_size;
|
||||
|
||||
use tonic::codec::CompressionEncoding;
|
||||
let service_builder = $service
|
||||
.max_decoding_message_size(max_recv_message_size)
|
||||
.max_encoding_message_size(max_send_message_size)
|
||||
|
||||
@@ -842,6 +842,13 @@ impl Server for HttpServer {
|
||||
let app = self.build(app);
|
||||
let server = axum::Server::bind(&listening)
|
||||
.tcp_nodelay(true)
|
||||
// Enable TCP keepalive to close the dangling established connections.
|
||||
// It's configured to let the keepalive probes first send after the connection sits
|
||||
// idle for 59 minutes, and then send every 10 seconds for 6 times.
|
||||
// So the connection will be closed after roughly 1 hour.
|
||||
.tcp_keepalive(Some(Duration::from_secs(59 * 60)))
|
||||
.tcp_keepalive_interval(Some(Duration::from_secs(10)))
|
||||
.tcp_keepalive_retries(Some(6))
|
||||
.serve(app.into_make_service());
|
||||
|
||||
*shutdown_tx = Some(tx);
|
||||
|
||||
@@ -25,9 +25,11 @@ use common_base::secrets::ExposeSecret;
|
||||
use common_config::Configurable;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_query::OutputData;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use common_telemetry::warn;
|
||||
use common_test_util::ports;
|
||||
use common_test_util::recordbatch::{check_output_stream, ExpectedOutput};
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{
|
||||
@@ -54,6 +56,7 @@ use servers::tls::ReloadableTlsServerConfig;
|
||||
use servers::Mode;
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::database::Database;
|
||||
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
|
||||
|
||||
pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
|
||||
@@ -687,3 +690,25 @@ where
|
||||
|
||||
test(endpoints).await
|
||||
}
|
||||
|
||||
pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
|
||||
let output = db.sql(sql).await.unwrap();
|
||||
let output = output.data;
|
||||
|
||||
match (&output, expected) {
|
||||
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
|
||||
assert_eq!(
|
||||
*x, y,
|
||||
r#"
|
||||
expected: {y}
|
||||
actual: {x}
|
||||
"#
|
||||
)
|
||||
}
|
||||
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
|
||||
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
|
||||
check_output_stream(output, x).await
|
||||
}
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user