mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Enable broker client keepalives. (#3127)
Should fix stale connections. ref https://github.com/neondatabase/neon/issues/3108
This commit is contained in:
@@ -137,6 +137,7 @@ pub struct PageServerConf {
|
||||
|
||||
/// Storage broker endpoints to connect to.
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
|
||||
pub log_format: LogFormat,
|
||||
|
||||
@@ -215,6 +216,7 @@ struct PageServerConfigBuilder {
|
||||
|
||||
profiling: BuilderValue<ProfilingConfig>,
|
||||
broker_endpoint: BuilderValue<Uri>,
|
||||
broker_keepalive_interval: BuilderValue<Duration>,
|
||||
|
||||
log_format: BuilderValue<LogFormat>,
|
||||
|
||||
@@ -247,6 +249,10 @@ impl Default for PageServerConfigBuilder {
|
||||
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint")),
|
||||
broker_keepalive_interval: Set(humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default keepalive interval")),
|
||||
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
|
||||
|
||||
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
|
||||
@@ -310,6 +316,10 @@ impl PageServerConfigBuilder {
|
||||
self.broker_endpoint = BuilderValue::Set(broker_endpoint)
|
||||
}
|
||||
|
||||
pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
|
||||
self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: NodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
@@ -365,6 +375,9 @@ impl PageServerConfigBuilder {
|
||||
broker_endpoint: self
|
||||
.broker_endpoint
|
||||
.ok_or(anyhow!("No broker endpoints provided"))?,
|
||||
broker_keepalive_interval: self
|
||||
.broker_keepalive_interval
|
||||
.ok_or(anyhow!("No broker keepalive interval provided"))?,
|
||||
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
|
||||
concurrent_tenant_size_logical_size_queries: self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -532,6 +545,7 @@ impl PageServerConf {
|
||||
"id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
|
||||
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
|
||||
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
|
||||
"broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
|
||||
"log_format" => builder.log_format(
|
||||
LogFormat::from_config(&parse_toml_string(key, item)?)?
|
||||
),
|
||||
@@ -659,6 +673,7 @@ impl PageServerConf {
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::dummy_conf(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(5000),
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
}
|
||||
@@ -829,6 +844,9 @@ log_format = 'json'
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL
|
||||
)?,
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
},
|
||||
@@ -872,6 +890,7 @@ log_format = 'json'
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
log_format: LogFormat::Json,
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
},
|
||||
|
||||
@@ -44,10 +44,13 @@ pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result
|
||||
let broker_endpoint = conf.broker_endpoint.clone();
|
||||
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
let broker_client = storage_broker::connect(broker_endpoint.clone()).context(format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
))?;
|
||||
let broker_client =
|
||||
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
|
||||
format!(
|
||||
"Failed to create broker client to {}",
|
||||
&conf.broker_endpoint
|
||||
),
|
||||
)?;
|
||||
|
||||
if BROKER_CLIENT.set(broker_client).is_err() {
|
||||
panic!("broker already initialized");
|
||||
|
||||
@@ -82,6 +82,9 @@ struct Args {
|
||||
/// established; plaintext otherwise.
|
||||
#[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
|
||||
broker_endpoint: Uri,
|
||||
/// Broker keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
broker_keepalive_interval: Duration,
|
||||
/// Peer safekeeper is considered dead after not receiving heartbeats from
|
||||
/// it during this period passed as a human readable duration.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
|
||||
@@ -142,6 +145,7 @@ fn main() -> anyhow::Result<()> {
|
||||
listen_http_addr: args.listen_http,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint)?;
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
|
||||
@@ -51,6 +51,7 @@ pub struct SafeKeeperConf {
|
||||
pub listen_http_addr: String,
|
||||
pub no_sync: bool,
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
@@ -83,6 +84,7 @@ impl SafeKeeperConf {
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
backup_runtime_threads: None,
|
||||
wal_backup_enabled: true,
|
||||
auth_validation_public_key_path: None,
|
||||
|
||||
@@ -88,7 +88,7 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
@@ -112,7 +112,7 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
@@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT).unwrap();
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
|
||||
@@ -39,7 +39,9 @@ use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::project_git_version;
|
||||
@@ -47,8 +49,8 @@ use utils::sentry_init::{init_sentry, release_name};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_CHAN_SIZE: usize = 128;
|
||||
const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms";
|
||||
const DEFAULT_CHAN_SIZE: usize = 32;
|
||||
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
|
||||
@@ -56,11 +58,14 @@ struct Args {
|
||||
/// Endpoint to listen on.
|
||||
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
listen_addr: SocketAddr,
|
||||
/// Size of the queue to the subscriber.
|
||||
/// Size of the queue to the per timeline subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
|
||||
chan_size: usize,
|
||||
timeline_chan_size: usize,
|
||||
/// Size of the queue to the all keys subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
|
||||
all_keys_chan_size: usize,
|
||||
/// HTTP/2 keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)]
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
|
||||
http2_keepalive_interval: Duration,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
@@ -108,7 +113,7 @@ struct SharedState {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
pub fn new(chan_size: usize) -> Self {
|
||||
pub fn new(all_keys_chan_size: usize) -> Self {
|
||||
SharedState {
|
||||
next_pub_id: 0,
|
||||
num_pubs: 0,
|
||||
@@ -116,7 +121,7 @@ impl SharedState {
|
||||
num_subs_to_timelines: 0,
|
||||
chans_to_timeline_subs: HashMap::new(),
|
||||
num_subs_to_all: 0,
|
||||
chan_to_all_subs: broadcast::channel(chan_size).0,
|
||||
chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,7 +144,7 @@ impl SharedState {
|
||||
pub fn register_subscriber(
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
chan_size: usize,
|
||||
timeline_chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
@@ -158,7 +163,7 @@ impl SharedState {
|
||||
self.chans_to_timeline_subs
|
||||
.entry(ttid)
|
||||
.or_insert(ChanToTimelineSub {
|
||||
chan: broadcast::channel(chan_size).0,
|
||||
chan: broadcast::channel(timeline_chan_size).0,
|
||||
num_subscribers: 0,
|
||||
});
|
||||
chan_to_timeline_sub.num_subscribers += 1;
|
||||
@@ -200,7 +205,7 @@ impl SharedState {
|
||||
#[derive(Clone)]
|
||||
struct Registry {
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
chan_size: usize,
|
||||
timeline_chan_size: usize,
|
||||
}
|
||||
|
||||
impl Registry {
|
||||
@@ -232,7 +237,7 @@ impl Registry {
|
||||
let (sub_id, sub_rx) = self
|
||||
.shared_state
|
||||
.write()
|
||||
.register_subscriber(sub_key, self.chan_size);
|
||||
.register_subscriber(sub_key, self.timeline_chan_size);
|
||||
info!(
|
||||
"subscription started id={}, key={:?}, addr={:?}",
|
||||
sub_id, sub_key, remote_addr
|
||||
@@ -369,7 +374,8 @@ impl BrokerService for Broker {
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
missed_msgs += skipped_msg;
|
||||
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
|
||||
warn!("dropped {} messages, channel is full", missed_msgs);
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
@@ -427,8 +433,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
|
||||
chan_size: args.chan_size,
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
|
||||
timeline_chan_size: args.timeline_chan_size,
|
||||
};
|
||||
let storage_broker_impl = Broker {
|
||||
registry: registry.clone(),
|
||||
@@ -522,7 +528,7 @@ mod tests {
|
||||
async fn test_registry() {
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
|
||||
chan_size: 16,
|
||||
timeline_chan_size: 16,
|
||||
};
|
||||
|
||||
// subscribe to timeline 2
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use hyper::body::HttpBody;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{ClientTlsConfig, Endpoint};
|
||||
use tonic::{transport::Channel, Code, Status};
|
||||
@@ -26,6 +27,8 @@ pub use hyper::Uri;
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
|
||||
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
|
||||
|
||||
// BrokerServiceClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
@@ -33,7 +36,7 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
// Create connection object configured to run TLS if schema starts with https://
|
||||
// and plain text otherwise. Connection is lazy, only endpoint sanity is
|
||||
// validated here.
|
||||
pub fn connect<U>(endpoint: U) -> anyhow::Result<BrokerClientChannel>
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static,
|
||||
@@ -46,6 +49,10 @@ where
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
.keep_alive_while_idle(true);
|
||||
// keep_alive_timeout is 20s by default on both client and server side
|
||||
let channel = tonic_endpoint.connect_lazy();
|
||||
Ok(BrokerClientChannel::new(channel))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user