diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 01cc0cf162..1b27f99ccf 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -51,7 +51,7 @@ pub struct SkTimelineInfo { #[serde(default)] pub peer_horizon_lsn: Option, #[serde(default)] - pub wal_stream_connection_string: Option, + pub safekeeper_connection_string: Option, } #[derive(Debug, thiserror::Error)] @@ -217,16 +217,22 @@ pub async fn subscribe_to_safekeeper_timeline_updates( break; } - let mut timeline_updates: HashMap> = - HashMap::new(); + let mut timeline_updates: HashMap> = HashMap::new(); + // Keep track that the timeline data updates from etcd arrive in the right order. + // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas + // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. + let mut timeline_etcd_versions: HashMap = HashMap::new(); + let events = resp.events(); debug!("Processing {} events", events.len()); for event in events { if EventType::Put == event.event_type() { - if let Some(kv) = event.kv() { - match parse_etcd_key_value(subscription_kind, ®ex, kv) { + if let Some(new_etcd_kv) = event.kv() { + let new_kv_version = new_etcd_kv.version(); + + match parse_etcd_key_value(subscription_kind, ®ex, new_etcd_kv) { Ok(Some((zttid, timeline))) => { match timeline_updates .entry(zttid) @@ -234,12 +240,15 @@ pub async fn subscribe_to_safekeeper_timeline_updates( .entry(timeline.safekeeper_id) { hash_map::Entry::Occupied(mut o) => { - if o.get().flush_lsn < timeline.info.flush_lsn { + let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN); + if old_etcd_kv_version < new_kv_version { o.insert(timeline.info); + timeline_etcd_versions.insert(zttid,new_kv_version); } } hash_map::Entry::Vacant(v) => { v.insert(timeline.info); + timeline_etcd_versions.insert(zttid,new_kv_version); } } } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index c9ae1a8d98..d9c60c9db0 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -60,7 +60,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // lock is held. for zttid in GlobalTimelines::get_active_timelines() { if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { - let sk_info = tli.get_public_info()?; + let sk_info = tli.get_public_info(&conf)?; let put_opts = PutOptions::new().with_lease(lease.id()); client .put( diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 8b1072a54b..a12f628e06 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -89,7 +89,6 @@ struct SharedState { active: bool, num_computes: u32, pageserver_connstr: Option, - listen_pg_addr: String, last_removed_segno: XLogSegNo, } @@ -112,7 +111,6 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, - listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -132,7 +130,6 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, - listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -421,7 +418,7 @@ impl Timeline { } /// Prepare public safekeeper info for reporting. - pub fn get_public_info(&self) -> anyhow::Result { + pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result { let shared_state = self.mutex.lock().unwrap(); Ok(SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), @@ -435,18 +432,7 @@ impl Timeline { shared_state.sk.inmem.remote_consistent_lsn, )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - wal_stream_connection_string: shared_state - .pageserver_connstr - .as_deref() - .map(|pageserver_connstr| { - wal_stream_connection_string( - self.zttid, - &shared_state.listen_pg_addr, - pageserver_connstr, - ) - }) - .transpose() - .context("Failed to get the pageserver callmemaybe connstr")?, + safekeeper_connection_string: Some(conf.listen_pg_addr.clone()), }) } @@ -504,29 +490,6 @@ impl Timeline { } } -// pageserver connstr is needed to be able to distinguish between different pageservers -// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved -// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105 -fn wal_stream_connection_string( - ZTenantTimelineId { - tenant_id, - timeline_id, - }: ZTenantTimelineId, - listen_pg_addr_str: &str, - pageserver_connstr: &str, -) -> anyhow::Result { - let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str); - let me_conf = me_connstr - .parse::() - .with_context(|| { - format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one") - })?; - let (host, port) = utils::connstring::connection_host_port(&me_conf); - Ok(format!( - "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'", - )) -} - // Utilities needed by various Connection-like objects pub trait TimelineTools { fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;