Correctly operate etcd safekeeper timeline data

This commit is contained in:
Kirill Bulatov
2022-05-12 14:18:35 +03:00
committed by Kirill Bulatov
parent b10ae195b7
commit 4538f1e1b8
3 changed files with 18 additions and 46 deletions

View File

@@ -51,7 +51,7 @@ pub struct SkTimelineInfo {
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub wal_stream_connection_string: Option<String>,
pub safekeeper_connection_string: Option<String>,
}
#[derive(Debug, thiserror::Error)]
@@ -217,16 +217,22 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
break;
}
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> =
HashMap::new();
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut timeline_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(kv) = event.kv() {
match parse_etcd_key_value(subscription_kind, &regex, kv) {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
match parse_etcd_key_value(subscription_kind, &regex, new_etcd_kv) {
Ok(Some((zttid, timeline))) => {
match timeline_updates
.entry(zttid)
@@ -234,12 +240,15 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
.entry(timeline.safekeeper_id)
{
hash_map::Entry::Occupied(mut o) => {
if o.get().flush_lsn < timeline.info.flush_lsn {
let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
}
}
hash_map::Entry::Vacant(v) => {
v.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
}
}
}

View File

@@ -60,7 +60,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// lock is held.
for zttid in GlobalTimelines::get_active_timelines() {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
let sk_info = tli.get_public_info()?;
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(

View File

@@ -89,7 +89,6 @@ struct SharedState {
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
listen_pg_addr: String,
last_removed_segno: XLogSegNo,
}
@@ -112,7 +111,6 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -132,7 +130,6 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -421,7 +418,7 @@ impl Timeline {
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self) -> anyhow::Result<SkTimelineInfo> {
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
Ok(SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
@@ -435,18 +432,7 @@ impl Timeline {
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
wal_stream_connection_string: shared_state
.pageserver_connstr
.as_deref()
.map(|pageserver_connstr| {
wal_stream_connection_string(
self.zttid,
&shared_state.listen_pg_addr,
pageserver_connstr,
)
})
.transpose()
.context("Failed to get the pageserver callmemaybe connstr")?,
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
})
}
@@ -504,29 +490,6 @@ impl Timeline {
}
}
// pageserver connstr is needed to be able to distinguish between different pageservers
// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved
// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105
fn wal_stream_connection_string(
ZTenantTimelineId {
tenant_id,
timeline_id,
}: ZTenantTimelineId,
listen_pg_addr_str: &str,
pageserver_connstr: &str,
) -> anyhow::Result<String> {
let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str);
let me_conf = me_connstr
.parse::<postgres::config::Config>()
.with_context(|| {
format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one")
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
))
}
// Utilities needed by various Connection-like objects
pub trait TimelineTools {
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;