diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index d3f6fb8903..8e0eb971f3 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -5,6 +5,11 @@ use anyhow::Context; use anyhow::Error; use anyhow::Result; use etcd_broker::subscription_value::SkTimelineInfo; +use etcd_broker::LeaseKeepAliveStream; +use etcd_broker::LeaseKeeper; + +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::time::Duration; use tokio::spawn; use tokio::task::JoinHandle; @@ -21,7 +26,7 @@ use utils::zid::{NodeId, ZTenantTimelineId}; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; -const LEASE_TTL_SEC: i64 = 5; +const LEASE_TTL_SEC: i64 = 10; pub fn thread_main(conf: SafeKeeperConf) { let runtime = runtime::Builder::new_current_thread() @@ -154,13 +159,48 @@ pub fn get_candiate_name(system_id: NodeId) -> String { format!("id_{system_id}") } +async fn push_sk_info( + zttid: ZTenantTimelineId, + mut client: Client, + key: String, + sk_info: SkTimelineInfo, + mut lease: Lease, +) -> anyhow::Result<(ZTenantTimelineId, Lease)> { + let put_opts = PutOptions::new().with_lease(lease.id); + client + .put( + key.clone(), + serde_json::to_string(&sk_info)?, + Some(put_opts), + ) + .await + .with_context(|| format!("failed to push safekeeper info to {}", key))?; + + // revive the lease + lease + .keeper + .keep_alive() + .await + .context("failed to send LeaseKeepAliveRequest")?; + lease + .ka_stream + .message() + .await + .context("failed to receive LeaseKeepAliveResponse")?; + + Ok((zttid, lease)) +} + +struct Lease { + id: i64, + keeper: LeaseKeeper, + ka_stream: LeaseKeepAliveStream, +} + /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let mut client = Client::connect(&conf.broker_endpoints, None).await?; - - // Get and maintain lease to automatically delete obsolete data - let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; - let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?; + let mut leases: HashMap = HashMap::new(); let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); loop { @@ -168,33 +208,46 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // is under plain mutex. That's ok, all this code is not performance // sensitive and there is no risk of deadlock as we don't await while // lock is held. - for zttid in GlobalTimelines::get_active_timelines() { - if let Some(tli) = GlobalTimelines::get_loaded(zttid) { - let sk_info = tli.get_public_info(&conf)?; - let put_opts = PutOptions::new().with_lease(lease.id()); - client - .put( - timeline_safekeeper_path( - conf.broker_etcd_prefix.clone(), - zttid, - conf.my_id, - ), - serde_json::to_string(&sk_info)?, - Some(put_opts), - ) - .await - .context("failed to push safekeeper info")?; + let active_tlis = GlobalTimelines::get_active_timelines(); + + // // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data. + for zttid in active_tlis.iter() { + if let Entry::Vacant(v) = leases.entry(*zttid) { + let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; + let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?; + v.insert(Lease { + id: lease.id(), + keeper, + ka_stream, + }); } } - // revive the lease - keeper - .keep_alive() - .await - .context("failed to send LeaseKeepAliveRequest")?; - ka_stream - .message() - .await - .context("failed to receive LeaseKeepAliveResponse")?; + leases.retain(|zttid, _| active_tlis.contains(zttid)); + + // Push data concurrently to not suffer from latency, with many timelines it can be slow. + let handles = active_tlis + .iter() + .filter_map(|zttid| GlobalTimelines::get_loaded(*zttid)) + .map(|tli| { + let sk_info = tli.get_public_info(&conf); + let key = timeline_safekeeper_path( + conf.broker_etcd_prefix.clone(), + tli.zttid, + conf.my_id, + ); + let lease = leases.remove(&tli.zttid).unwrap(); + tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease)) + }) + .collect::>(); + for h in handles { + let (zttid, lease) = h.await??; + // It is ugly to pull leases from hash and then put it back, but + // otherwise we have to resort to long living per tli tasks (which + // would generate a lot of errors when etcd is down) as task wants to + // have 'static objects, we can't borrow to it. + leases.insert(zttid, lease); + } + sleep(push_interval).await; } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 12cac831f4..bed6e447d7 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -11,7 +11,7 @@ use serde::Serialize; use tokio::sync::watch; use std::cmp::{max, min}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::{self}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -445,9 +445,9 @@ impl Timeline { } /// Prepare public safekeeper info for reporting. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result { + pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { let shared_state = self.mutex.lock().unwrap(); - Ok(SkTimelineInfo { + SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), // note: this value is not flushed to control file yet and can be lost @@ -460,7 +460,7 @@ impl Timeline { peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), safekeeper_connstr: Some(conf.listen_pg_addr.clone()), backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - }) + } } /// Update timeline state with peer safekeeper data. @@ -669,7 +669,7 @@ impl GlobalTimelines { } /// Get ZTenantTimelineIDs of all active timelines. - pub fn get_active_timelines() -> Vec { + pub fn get_active_timelines() -> HashSet { let state = TIMELINES_STATE.lock().unwrap(); state .timelines