From e14bbb889a24c06ea34314c1c864eec2c10c0d4e Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 16 Dec 2022 12:55:12 +0300 Subject: [PATCH] Enable broker client keepalives. (#3127) Should fix stale connections. ref https://github.com/neondatabase/neon/issues/3108 --- pageserver/src/config.rs | 19 ++++++++++++ pageserver/src/walreceiver.rs | 11 ++++--- safekeeper/src/bin/safekeeper.rs | 4 +++ safekeeper/src/broker.rs | 2 +- safekeeper/src/lib.rs | 2 ++ storage_broker/benches/rps.rs | 6 ++-- storage_broker/src/bin/storage_broker.rs | 38 ++++++++++++++---------- storage_broker/src/lib.rs | 9 +++++- 8 files changed, 66 insertions(+), 25 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 48e9f32276..9971ddc0f7 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -137,6 +137,7 @@ pub struct PageServerConf { /// Storage broker endpoints to connect to. pub broker_endpoint: Uri, + pub broker_keepalive_interval: Duration, pub log_format: LogFormat, @@ -215,6 +216,7 @@ struct PageServerConfigBuilder { profiling: BuilderValue, broker_endpoint: BuilderValue, + broker_keepalive_interval: BuilderValue, log_format: BuilderValue, @@ -247,6 +249,10 @@ impl Default for PageServerConfigBuilder { broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT .parse() .expect("failed to parse default broker endpoint")), + broker_keepalive_interval: Set(humantime::parse_duration( + storage_broker::DEFAULT_KEEPALIVE_INTERVAL, + ) + .expect("cannot parse default keepalive interval")), log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()), @@ -310,6 +316,10 @@ impl PageServerConfigBuilder { self.broker_endpoint = BuilderValue::Set(broker_endpoint) } + pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) { + self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval) + } + pub fn id(&mut self, node_id: NodeId) { self.id = BuilderValue::Set(node_id) } @@ -365,6 +375,9 @@ impl PageServerConfigBuilder { broker_endpoint: self .broker_endpoint .ok_or(anyhow!("No broker endpoints provided"))?, + broker_keepalive_interval: self + .broker_keepalive_interval + .ok_or(anyhow!("No broker keepalive interval provided"))?, log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, concurrent_tenant_size_logical_size_queries: self .concurrent_tenant_size_logical_size_queries @@ -532,6 +545,7 @@ impl PageServerConf { "id" => builder.id(NodeId(parse_toml_u64(key, item)?)), "profiling" => builder.profiling(parse_toml_from_str(key, item)?), "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?), + "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?), "log_format" => builder.log_format( LogFormat::from_config(&parse_toml_string(key, item)?)? ), @@ -659,6 +673,7 @@ impl PageServerConf { profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::dummy_conf(), broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), + broker_keepalive_interval: Duration::from_secs(5000), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), } @@ -829,6 +844,9 @@ log_format = 'json' profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::default(), broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), + broker_keepalive_interval: humantime::parse_duration( + storage_broker::DEFAULT_KEEPALIVE_INTERVAL + )?, log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), }, @@ -872,6 +890,7 @@ log_format = 'json' profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::default(), broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), + broker_keepalive_interval: Duration::from_secs(5), log_format: LogFormat::Json, concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), }, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 74ede7c213..aaf46579a7 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -44,10 +44,13 @@ pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result let broker_endpoint = conf.broker_endpoint.clone(); // Note: we do not attempt connecting here (but validate endpoints sanity). - let broker_client = storage_broker::connect(broker_endpoint.clone()).context(format!( - "Failed to create broker client to {}", - &conf.broker_endpoint - ))?; + let broker_client = + storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context( + format!( + "Failed to create broker client to {}", + &conf.broker_endpoint + ), + )?; if BROKER_CLIENT.set(broker_client).is_err() { panic!("broker already initialized"); diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index cab5053b5b..275253d1d4 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -82,6 +82,9 @@ struct Args { /// established; plaintext otherwise. #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)] broker_endpoint: Uri, + /// Broker keepalive interval. + #[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)] + broker_keepalive_interval: Duration, /// Peer safekeeper is considered dead after not receiving heartbeats from /// it during this period passed as a human readable duration. #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)] @@ -142,6 +145,7 @@ fn main() -> anyhow::Result<()> { listen_http_addr: args.listen_http, no_sync: args.no_sync, broker_endpoint: args.broker_endpoint, + broker_keepalive_interval: args.broker_keepalive_interval, heartbeat_timeout: args.heartbeat_timeout, remote_storage: args.remote_storage, max_offloader_lag_bytes: args.max_offloader_lag, diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index df2dc92efe..92f35bf51f 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -66,7 +66,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { /// Subscribe and fetch all the interesting data from the broker. async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { - let mut client = storage_broker::connect(conf.broker_endpoint)?; + let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?; // TODO: subscribe only to local timelines instead of all let request = SubscribeSafekeeperInfoRequest { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 60a1911068..5decfe64de 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -51,6 +51,7 @@ pub struct SafeKeeperConf { pub listen_http_addr: String, pub no_sync: bool, pub broker_endpoint: Uri, + pub broker_keepalive_interval: Duration, pub heartbeat_timeout: Duration, pub remote_storage: Option, pub max_offloader_lag_bytes: u64, @@ -83,6 +84,7 @@ impl SafeKeeperConf { broker_endpoint: storage_broker::DEFAULT_ENDPOINT .parse() .expect("failed to parse default broker endpoint"), + broker_keepalive_interval: Duration::from_secs(5), backup_runtime_threads: None, wal_backup_enabled: true, auth_validation_public_key_path: None, diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs index 73141318b8..1262bd9333 100644 --- a/storage_broker/benches/rps.rs +++ b/storage_broker/benches/rps.rs @@ -88,7 +88,7 @@ fn tli_from_u64(i: u64) -> Vec { async fn subscribe(client: Option, counter: Arc, i: u64) { let mut client = match client { Some(c) => c, - None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(), + None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), }; let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId { @@ -112,7 +112,7 @@ async fn subscribe(client: Option, counter: Arc, async fn publish(client: Option, n_keys: u64) { let mut client = match client { Some(c) => c, - None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(), + None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), }; let mut counter: u64 = 0; @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box> { } let h = tokio::spawn(progress_reporter(counters.clone())); - let c = storage_broker::connect(DEFAULT_ENDPOINT).unwrap(); + let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(); for i in 0..args.num_subs { let c = Some(c.clone()); diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index fdf2637b4d..6d80e96bf1 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -39,7 +39,9 @@ use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE}; use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer}; use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; -use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR}; +use storage_broker::{ + parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, +}; use utils::id::TenantTimelineId; use utils::logging::{self, LogFormat}; use utils::project_git_version; @@ -47,8 +49,8 @@ use utils::sentry_init::{init_sentry, release_name}; project_git_version!(GIT_VERSION); -const DEFAULT_CHAN_SIZE: usize = 128; -const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms"; +const DEFAULT_CHAN_SIZE: usize = 32; +const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384; #[derive(Parser, Debug)] #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)] @@ -56,11 +58,14 @@ struct Args { /// Endpoint to listen on. #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)] listen_addr: SocketAddr, - /// Size of the queue to the subscriber. + /// Size of the queue to the per timeline subscriber. #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)] - chan_size: usize, + timeline_chan_size: usize, + /// Size of the queue to the all keys subscriber. + #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)] + all_keys_chan_size: usize, /// HTTP/2 keepalive interval. - #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)] + #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] http2_keepalive_interval: Duration, /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] @@ -108,7 +113,7 @@ struct SharedState { } impl SharedState { - pub fn new(chan_size: usize) -> Self { + pub fn new(all_keys_chan_size: usize) -> Self { SharedState { next_pub_id: 0, num_pubs: 0, @@ -116,7 +121,7 @@ impl SharedState { num_subs_to_timelines: 0, chans_to_timeline_subs: HashMap::new(), num_subs_to_all: 0, - chan_to_all_subs: broadcast::channel(chan_size).0, + chan_to_all_subs: broadcast::channel(all_keys_chan_size).0, } } @@ -139,7 +144,7 @@ impl SharedState { pub fn register_subscriber( &mut self, sub_key: SubscriptionKey, - chan_size: usize, + timeline_chan_size: usize, ) -> (SubId, broadcast::Receiver) { let sub_id = self.next_sub_id; self.next_sub_id += 1; @@ -158,7 +163,7 @@ impl SharedState { self.chans_to_timeline_subs .entry(ttid) .or_insert(ChanToTimelineSub { - chan: broadcast::channel(chan_size).0, + chan: broadcast::channel(timeline_chan_size).0, num_subscribers: 0, }); chan_to_timeline_sub.num_subscribers += 1; @@ -200,7 +205,7 @@ impl SharedState { #[derive(Clone)] struct Registry { shared_state: Arc>, - chan_size: usize, + timeline_chan_size: usize, } impl Registry { @@ -232,7 +237,7 @@ impl Registry { let (sub_id, sub_rx) = self .shared_state .write() - .register_subscriber(sub_key, self.chan_size); + .register_subscriber(sub_key, self.timeline_chan_size); info!( "subscription started id={}, key={:?}, addr={:?}", sub_id, sub_key, remote_addr @@ -369,7 +374,8 @@ impl BrokerService for Broker { Err(RecvError::Lagged(skipped_msg)) => { missed_msgs += skipped_msg; if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) { - warn!("dropped {} messages, channel is full", missed_msgs); + warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full", + subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs); missed_msgs = 0; } } @@ -427,8 +433,8 @@ async fn main() -> Result<(), Box> { info!("version: {GIT_VERSION}"); let registry = Registry { - shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))), - chan_size: args.chan_size, + shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))), + timeline_chan_size: args.timeline_chan_size, }; let storage_broker_impl = Broker { registry: registry.clone(), @@ -522,7 +528,7 @@ mod tests { async fn test_registry() { let registry = Registry { shared_state: Arc::new(RwLock::new(SharedState::new(16))), - chan_size: 16, + timeline_chan_size: 16, }; // subscribe to timeline 2 diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 0629caa2fb..d12a79a69f 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -1,6 +1,7 @@ use hyper::body::HttpBody; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use tonic::codegen::StdError; use tonic::transport::{ClientTlsConfig, Endpoint}; use tonic::{transport::Channel, Code, Status}; @@ -26,6 +27,8 @@ pub use hyper::Uri; pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}"); +pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms"; + // BrokerServiceClient charged with tonic provided Channel transport; helps to // avoid depending on tonic directly in user crates. pub type BrokerClientChannel = BrokerServiceClient; @@ -33,7 +36,7 @@ pub type BrokerClientChannel = BrokerServiceClient; // Create connection object configured to run TLS if schema starts with https:// // and plain text otherwise. Connection is lazy, only endpoint sanity is // validated here. -pub fn connect(endpoint: U) -> anyhow::Result +pub fn connect(endpoint: U, keepalive_interval: Duration) -> anyhow::Result where U: std::convert::TryInto, U::Error: std::error::Error + Send + Sync + 'static, @@ -46,6 +49,10 @@ where let tls = ClientTlsConfig::new(); tonic_endpoint = tonic_endpoint.tls_config(tls)?; } + tonic_endpoint = tonic_endpoint + .http2_keep_alive_interval(keepalive_interval) + .keep_alive_while_idle(true); + // keep_alive_timeout is 20s by default on both client and server side let channel = tonic_endpoint.connect_lazy(); Ok(BrokerClientChannel::new(channel)) }