From 8a53472e4f22cb3462694b5e3919c8d482fe5f58 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 2 May 2022 23:28:54 +0300 Subject: [PATCH] Force etcd broker keys to not to intersect --- libs/etcd_broker/src/lib.rs | 405 ++++++--------------- libs/etcd_broker/src/subscription_key.rs | 310 ++++++++++++++++ libs/etcd_broker/src/subscription_value.rs | 35 ++ pageserver/src/walreceiver.rs | 11 +- safekeeper/src/broker.rs | 33 +- safekeeper/src/http/routes.rs | 2 +- safekeeper/src/safekeeper.rs | 2 +- safekeeper/src/timeline.rs | 2 +- safekeeper/src/wal_backup.rs | 19 +- 9 files changed, 499 insertions(+), 320 deletions(-) create mode 100644 libs/etcd_broker/src/subscription_key.rs create mode 100644 libs/etcd_broker/src/subscription_value.rs diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 6b3293ec40..38d4a403c2 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -1,91 +1,43 @@ //! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent). //! Intended to connect services to each other, not to store their data. + +/// All broker keys, that are used when dealing with etcd. +pub mod subscription_key; +/// All broker values, possible to use when dealing with etcd. +pub mod subscription_value; + use std::{ collections::{hash_map, HashMap}, - fmt::Display, str::FromStr, }; -use once_cell::sync::Lazy; -use regex::{Captures, Regex}; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; - -pub use etcd_client::*; +use serde::de::DeserializeOwned; +use subscription_key::SubscriptionKey; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::*; -use utils::{ - lsn::Lsn, - zid::{NodeId, ZTenantId, ZTenantTimelineId}, -}; +use utils::zid::{NodeId, ZTenantTimelineId}; + +use crate::subscription_key::SubscriptionFullKey; + +pub use etcd_client::*; /// Default value to use for prefixing to all etcd keys with. /// This way allows isolating safekeeper/pageserver groups in the same etcd cluster. pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon"; -#[derive(Debug, Deserialize, Serialize)] -struct SafekeeperTimeline { - safekeeper_id: NodeId, - info: SkTimelineInfo, -} - -/// Published data about safekeeper's timeline. Fields made optional for easy migrations. -#[serde_as] -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct SkTimelineInfo { - /// Term of the last entry. - pub last_log_term: Option, - /// LSN of the last record. - #[serde_as(as = "Option")] - #[serde(default)] - pub flush_lsn: Option, - /// Up to which LSN safekeeper regards its WAL as committed. - #[serde_as(as = "Option")] - #[serde(default)] - pub commit_lsn: Option, - /// LSN up to which safekeeper has backed WAL. - #[serde_as(as = "Option")] - #[serde(default)] - pub backup_lsn: Option, - /// LSN of last checkpoint uploaded by pageserver. - #[serde_as(as = "Option")] - #[serde(default)] - pub remote_consistent_lsn: Option, - #[serde_as(as = "Option")] - #[serde(default)] - pub peer_horizon_lsn: Option, - #[serde(default)] - pub safekeeper_connstr: Option, -} - -#[derive(Debug, thiserror::Error)] -pub enum BrokerError { - #[error("Etcd client error: {0}. Context: {1}")] - EtcdClient(etcd_client::Error, String), - #[error("Error during parsing etcd key: {0}")] - InvalidKey(String), - #[error("Error during parsing etcd value: {0}")] - ParsingError(String), - #[error("Internal error: {0}")] - InternalError(String), -} - /// A way to control the data retrieval from a certain subscription. -pub struct SkTimelineSubscription { - safekeeper_timeline_updates: - mpsc::UnboundedReceiver>>, - kind: SkTimelineSubscriptionKind, +pub struct BrokerSubscription { + value_updates: mpsc::UnboundedReceiver>>, + key: SubscriptionKey, watcher_handle: JoinHandle>, watcher: Watcher, } -impl SkTimelineSubscription { +impl BrokerSubscription { /// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet. - pub async fn fetch_data( - &mut self, - ) -> Option>> { - self.safekeeper_timeline_updates.recv().await + pub async fn fetch_data(&mut self) -> Option>> { + self.value_updates.recv().await } /// Cancels the subscription, stopping the data poller and waiting for it to shut down. @@ -93,117 +45,90 @@ impl SkTimelineSubscription { self.watcher.cancel().await.map_err(|e| { BrokerError::EtcdClient( e, - format!( - "Failed to cancel timeline subscription, kind: {:?}", - self.kind - ), + format!("Failed to cancel broker subscription, kind: {:?}", self.key), ) })?; self.watcher_handle.await.map_err(|e| { BrokerError::InternalError(format!( - "Failed to join the timeline updates task, kind: {:?}, error: {e}", - self.kind + "Failed to join the broker value updates task, kind: {:?}, error: {e}", + self.key )) })? } } -/// The subscription kind to the timeline updates from safekeeper. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct SkTimelineSubscriptionKind { - broker_etcd_prefix: String, - kind: SubscriptionKind, -} - -impl SkTimelineSubscriptionKind { - pub fn all(broker_etcd_prefix: String) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::All, - } - } - - pub fn tenant(broker_etcd_prefix: String, tenant: ZTenantId) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::Tenant(tenant), - } - } - - pub fn timeline(broker_etcd_prefix: String, timeline: ZTenantTimelineId) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::Timeline(timeline), - } - } - - /// Etcd key to use for watching a certain timeline updates from safekeepers. - pub fn watch_key(&self) -> String { - match self.kind { - SubscriptionKind::All => self.broker_etcd_prefix.to_string(), - SubscriptionKind::Tenant(tenant_id) => { - format!("{}/{tenant_id}/safekeeper", self.broker_etcd_prefix) - } - SubscriptionKind::Timeline(ZTenantTimelineId { - tenant_id, - timeline_id, - }) => format!( - "{}/{tenant_id}/{timeline_id}/safekeeper", - self.broker_etcd_prefix - ), - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum SubscriptionKind { - /// Get every timeline update. - All, - /// Get certain tenant timelines' updates. - Tenant(ZTenantId), - /// Get certain timeline updates. - Timeline(ZTenantTimelineId), +#[derive(Debug, thiserror::Error)] +pub enum BrokerError { + #[error("Etcd client error: {0}. Context: {1}")] + EtcdClient(etcd_client::Error, String), + #[error("Error during parsing etcd key: {0}")] + KeyNotParsed(String), + #[error("Internal error: {0}")] + InternalError(String), } /// Creates a background task to poll etcd for timeline updates from safekeepers. /// Stops and returns `Err` on any error during etcd communication. /// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle, /// exiting normally in such cases. -pub async fn subscribe_to_safekeeper_timeline_updates( +/// Etcd values are parsed as json fukes into a type, specified in the generic patameter. +pub async fn subscribe_for_json_values( client: &mut Client, - subscription: SkTimelineSubscriptionKind, -) -> Result { - info!("Subscribing to timeline updates, subscription kind: {subscription:?}"); - let kind = subscription.clone(); + key: SubscriptionKey, +) -> Result, BrokerError> +where + V: DeserializeOwned + Send + 'static, +{ + subscribe_for_values(client, key, |_, value_str| { + match serde_json::from_str::(value_str) { + Ok(value) => Some(value), + Err(e) => { + error!("Failed to parse value str '{value_str}': {e}"); + None + } + } + }) + .await +} + +/// Same as [`subscribe_for_json_values`], but allows to specify a custom parser of a etcd value string. +pub async fn subscribe_for_values( + client: &mut Client, + key: SubscriptionKey, + value_parser: P, +) -> Result, BrokerError> +where + V: Send + 'static, + P: Fn(SubscriptionFullKey, &str) -> Option + Send + 'static, +{ + info!("Subscribing to broker value updates, key: {key:?}"); + let subscription_key = key.clone(); let (watcher, mut stream) = client - .watch( - subscription.watch_key(), - Some(WatchOptions::new().with_prefix()), - ) + .watch(key.watch_key(), Some(WatchOptions::new().with_prefix())) .await .map_err(|e| { BrokerError::EtcdClient( e, - format!("Failed to init the watch for subscription {subscription:?}"), + format!("Failed to init the watch for subscription {key:?}"), ) })?; - let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel(); + let (value_updates_sender, value_updates_receiver) = mpsc::unbounded_channel(); let watcher_handle = tokio::spawn(async move { while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!( - "Failed to get messages from the subscription stream, kind: {:?}, error: {e}", subscription.kind + "Failed to get messages from the subscription stream, kind: {:?}, error: {e}", key.kind )))? { if resp.canceled() { info!("Watch for timeline updates subscription was canceled, exiting"); break; } - let mut timeline_updates: HashMap> = HashMap::new(); + let mut value_updates: HashMap> = HashMap::new(); // Keep track that the timeline data updates from etcd arrive in the right order. // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. - let mut timeline_etcd_versions: HashMap = HashMap::new(); + let mut value_etcd_versions: HashMap = HashMap::new(); let events = resp.events(); @@ -213,182 +138,78 @@ pub async fn subscribe_to_safekeeper_timeline_updates( if EventType::Put == event.event_type() { if let Some(new_etcd_kv) = event.kv() { let new_kv_version = new_etcd_kv.version(); - let (key_str, value_str) = match extract_key_value_str(new_etcd_kv) { - Ok(strs) => strs, - Err(e) => { - error!("Failed to represent etcd KV {new_etcd_kv:?} as pair of str: {e}"); - continue; - }, - }; - match parse_safekeeper_timeline(&subscription, key_str, value_str) { - Ok((zttid, timeline)) => { - match timeline_updates - .entry(zttid) - .or_default() - .entry(timeline.safekeeper_id) - { - hash_map::Entry::Occupied(mut o) => { - let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN); - if old_etcd_kv_version < new_kv_version { - o.insert(timeline.info); - timeline_etcd_versions.insert(zttid,new_kv_version); - } else { - debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); + match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) { + Ok(Some((key, value))) => match value_updates + .entry(key.id) + .or_default() + .entry(key.node_id) + { + hash_map::Entry::Occupied(mut o) => { + let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN); + if old_etcd_kv_version < new_kv_version { + o.insert(value); + value_etcd_versions.insert(key.id,new_kv_version); + } else { + debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); + } } - } - hash_map::Entry::Vacant(v) => { - v.insert(timeline.info); - timeline_etcd_versions.insert(zttid,new_kv_version); - } - } - } - // it is normal to get other keys when we subscribe to everything - Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"), - Err(e) => error!("Failed to parse timeline update: {e}"), + hash_map::Entry::Vacant(v) => { + v.insert(value); + value_etcd_versions.insert(key.id,new_kv_version); + } + }, + Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"), + Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"), + Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"), }; } } } - if let Err(e) = timeline_updates_sender.send(timeline_updates) { - info!("Timeline updates sender got dropped, exiting: {e}"); - break; + if !value_updates.is_empty() { + if let Err(e) = value_updates_sender.send(value_updates) { + info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); + break; + } } } Ok(()) }.instrument(info_span!("etcd_broker"))); - Ok(SkTimelineSubscription { - kind, - safekeeper_timeline_updates, + Ok(BrokerSubscription { + key: subscription_key, + value_updates: value_updates_receiver, watcher_handle, watcher, }) } -fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> { - let key = kv.key_str().map_err(|e| { +fn parse_etcd_kv( + kv: &KeyValue, + value_parser: &P, + cluster_prefix: &str, +) -> Result, BrokerError> +where + P: Fn(SubscriptionFullKey, &str) -> Option, +{ + let key_str = kv.key_str().map_err(|e| { BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string()) })?; - let value = kv.value_str().map_err(|e| { + let value_str = kv.value_str().map_err(|e| { BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string()) })?; - Ok((key, value)) -} -static SK_TIMELINE_KEY_REGEX: Lazy = Lazy::new(|| { - Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$") - .expect("wrong regex for safekeeper timeline etcd key") -}); - -fn parse_safekeeper_timeline( - subscription: &SkTimelineSubscriptionKind, - key_str: &str, - value_str: &str, -) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> { - let broker_prefix = subscription.broker_etcd_prefix.as_str(); - if !key_str.starts_with(broker_prefix) { - return Err(BrokerError::InvalidKey(format!( - "KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}" + if !key_str.starts_with(cluster_prefix) { + return Err(BrokerError::KeyNotParsed(format!( + "KV has unexpected key '{key_str}' that does not start with cluster prefix {cluster_prefix}" ))); } - let key_part = &key_str[broker_prefix.len()..]; - let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) { - Some(captures) => captures, - None => { - return Err(BrokerError::InvalidKey(format!( - "KV has unexpected key part '{key_part}' that does not match required regex {}", - SK_TIMELINE_KEY_REGEX.as_str() - ))); - } - }; - let info = serde_json::from_str(value_str).map_err(|e| { - BrokerError::ParsingError(format!( - "Failed to parse '{value_str}' as safekeeper timeline info: {e}" - )) + let key = SubscriptionFullKey::from_str(&key_str[cluster_prefix.len()..]).map_err(|e| { + BrokerError::KeyNotParsed(format!("Failed to parse KV key '{key_str}': {e}")) })?; - let zttid = ZTenantTimelineId::new( - parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, - parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?, - ); - let safekeeper_id = NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?); - - Ok(( - zttid, - SafekeeperTimeline { - safekeeper_id, - info, - }, - )) -} - -fn parse_capture(caps: &Captures, index: usize) -> Result -where - T: FromStr, - ::Err: Display, -{ - let capture_match = caps - .get(index) - .ok_or_else(|| format!("Failed to get capture match at index {index}"))? - .as_str(); - capture_match.parse().map_err(|e| { - format!( - "Failed to parse {} from {capture_match}: {e}", - std::any::type_name::() - ) - }) -} - -#[cfg(test)] -mod tests { - use utils::zid::ZTimelineId; - - use super::*; - - #[test] - fn typical_etcd_prefix_should_be_parsed() { - let prefix = "neon"; - let tenant_id = ZTenantId::generate(); - let timeline_id = ZTimelineId::generate(); - let all_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::All, - }; - let tenant_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::Tenant(tenant_id), - }; - let timeline_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::Timeline(ZTenantTimelineId::new(tenant_id, timeline_id)), - }; - - let typical_etcd_kv_strs = [ - ( - format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/1"), - r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, - ), - ( - format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/13"), - r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, - ), - ]; - - for (key_string, value_str) in typical_etcd_kv_strs { - for subscription in [ - &all_subscription, - &tenant_subscription, - &timeline_subscription, - ] { - let (id, _timeline) = - parse_safekeeper_timeline(subscription, &key_string, value_str) - .unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}")); - assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id)); - } - } - } + Ok(value_parser(key, value_str).map(|value| (key, value))) } diff --git a/libs/etcd_broker/src/subscription_key.rs b/libs/etcd_broker/src/subscription_key.rs new file mode 100644 index 0000000000..8f8579f4e5 --- /dev/null +++ b/libs/etcd_broker/src/subscription_key.rs @@ -0,0 +1,310 @@ +//! Etcd broker keys, used in the project and shared between instances. +//! The keys are split into two categories: +//! +//! * [`SubscriptionFullKey`] full key format: `/////` +//! Always returned from etcd in this form, always start with the user key provided. +//! +//! * [`SubscriptionKey`] user input key format: always partial, since it's unknown which `node_id`'s are available. +//! Full key always starts with the user input one, due to etcd subscription properties. + +use std::{fmt::Display, str::FromStr}; + +use once_cell::sync::Lazy; +use regex::{Captures, Regex}; +use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId}; + +/// The subscription kind to the timeline updates from safekeeper. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SubscriptionKey { + /// Generic cluster prefix, allowing to use the same etcd instance by multiple logic groups. + pub cluster_prefix: String, + /// The subscription kind. + pub kind: SubscriptionKind, +} + +/// All currently possible key kinds of a etcd broker subscription. +/// Etcd works so, that every key that starts with the subbscription key given is considered matching and +/// returned as part of the subscrption. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SubscriptionKind { + /// Get every update in etcd. + All, + /// Get etcd updates for any timeiline of a certain tenant, affected by any operation from any node kind. + TenantTimelines(ZTenantId), + /// Get etcd updates for a certain timeline of a tenant, affected by any operation from any node kind. + Timeline(ZTenantTimelineId), + /// Get etcd timeline updates, specific to a certain node kind. + Node(ZTenantTimelineId, NodeKind), + /// Get etcd timeline updates for a certain operation on specific nodes. + Operation(ZTenantTimelineId, NodeKind, OperationKind), +} + +/// All kinds of nodes, able to write into etcd. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum NodeKind { + Safekeeper, + Pageserver, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperationKind { + Safekeeper(SkOperationKind), +} + +/// Current operations, running inside the safekeeper node. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SkOperationKind { + TimelineInfo, + WalBackup, +} + +static SUBSCRIPTION_FULL_KEY_REGEX: Lazy = Lazy::new(|| { + Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/([^/]+)/([^/]+)/([[:digit:]]+)$") + .expect("wrong subscription full etcd key regex") +}); + +/// Full key, received from etcd during any of the component's work. +/// No other etcd keys are considered during system's work. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct SubscriptionFullKey { + pub id: ZTenantTimelineId, + pub node_kind: NodeKind, + pub operation: OperationKind, + pub node_id: NodeId, +} + +impl SubscriptionKey { + /// Subscribes for all etcd updates. + pub fn all(cluster_prefix: String) -> Self { + SubscriptionKey { + cluster_prefix, + kind: SubscriptionKind::All, + } + } + + /// Subscribes to a given timeline info updates from safekeepers. + pub fn sk_timeline_info(cluster_prefix: String, timeline: ZTenantTimelineId) -> Self { + Self { + cluster_prefix, + kind: SubscriptionKind::Operation( + timeline, + NodeKind::Safekeeper, + OperationKind::Safekeeper(SkOperationKind::TimelineInfo), + ), + } + } + + /// Subscribes to all timeine updates during specific operations, running on the corresponding nodes. + pub fn operation( + cluster_prefix: String, + timeline: ZTenantTimelineId, + node_kind: NodeKind, + operation: OperationKind, + ) -> Self { + Self { + cluster_prefix, + kind: SubscriptionKind::Operation(timeline, node_kind, operation), + } + } + + /// Etcd key to use for watching a certain timeline updates from safekeepers. + pub fn watch_key(&self) -> String { + let cluster_prefix = &self.cluster_prefix; + match self.kind { + SubscriptionKind::All => cluster_prefix.to_string(), + SubscriptionKind::TenantTimelines(tenant_id) => { + format!("{cluster_prefix}/{tenant_id}") + } + SubscriptionKind::Timeline(id) => { + format!("{cluster_prefix}/{id}") + } + SubscriptionKind::Node(id, node_kind) => { + format!("{cluster_prefix}/{id}/{node_kind}") + } + SubscriptionKind::Operation(id, node_kind, operation_kind) => { + format!("{cluster_prefix}/{id}/{node_kind}/{operation_kind}") + } + } + } +} + +impl Display for OperationKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OperationKind::Safekeeper(o) => o.fmt(f), + } + } +} + +impl FromStr for OperationKind { + type Err = String; + + fn from_str(operation_kind_str: &str) -> Result { + match operation_kind_str { + "timeline_info" => Ok(OperationKind::Safekeeper(SkOperationKind::TimelineInfo)), + "wal_backup" => Ok(OperationKind::Safekeeper(SkOperationKind::WalBackup)), + _ => Err(format!("Unknown operation kind: {operation_kind_str}")), + } + } +} + +impl Display for SubscriptionFullKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + id, + node_kind, + operation, + node_id, + } = self; + write!(f, "{id}/{node_kind}/{operation}/{node_id}") + } +} + +impl FromStr for SubscriptionFullKey { + type Err = String; + + fn from_str(subscription_kind_str: &str) -> Result { + let key_captures = match SUBSCRIPTION_FULL_KEY_REGEX.captures(subscription_kind_str) { + Some(captures) => captures, + None => { + return Err(format!( + "Subscription kind str does not match a subscription full key regex {}", + SUBSCRIPTION_FULL_KEY_REGEX.as_str() + )); + } + }; + + Ok(Self { + id: ZTenantTimelineId::new( + parse_capture(&key_captures, 1)?, + parse_capture(&key_captures, 2)?, + ), + node_kind: parse_capture(&key_captures, 3)?, + operation: parse_capture(&key_captures, 4)?, + node_id: NodeId(parse_capture(&key_captures, 5)?), + }) + } +} + +fn parse_capture(caps: &Captures, index: usize) -> Result +where + T: FromStr, + ::Err: Display, +{ + let capture_match = caps + .get(index) + .ok_or_else(|| format!("Failed to get capture match at index {index}"))? + .as_str(); + capture_match.parse().map_err(|e| { + format!( + "Failed to parse {} from {capture_match}: {e}", + std::any::type_name::() + ) + }) +} + +impl Display for NodeKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Safekeeper => write!(f, "safekeeper"), + Self::Pageserver => write!(f, "pageserver"), + } + } +} + +impl FromStr for NodeKind { + type Err = String; + + fn from_str(node_kind_str: &str) -> Result { + match node_kind_str { + "safekeeper" => Ok(Self::Safekeeper), + "pageserver" => Ok(Self::Pageserver), + _ => Err(format!("Invalid node kind: {node_kind_str}")), + } + } +} + +impl Display for SkOperationKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TimelineInfo => write!(f, "timeline_info"), + Self::WalBackup => write!(f, "wal_backup"), + } + } +} + +impl FromStr for SkOperationKind { + type Err = String; + + fn from_str(operation_str: &str) -> Result { + match operation_str { + "timeline_info" => Ok(Self::TimelineInfo), + "wal_backup" => Ok(Self::WalBackup), + _ => Err(format!("Invalid operation: {operation_str}")), + } + } +} + +#[cfg(test)] +mod tests { + use utils::zid::ZTimelineId; + + use super::*; + + #[test] + fn full_cluster_key_parsing() { + let prefix = "neon"; + let node_kind = NodeKind::Safekeeper; + let operation_kind = OperationKind::Safekeeper(SkOperationKind::WalBackup); + let tenant_id = ZTenantId::generate(); + let timeline_id = ZTimelineId::generate(); + let id = ZTenantTimelineId::new(tenant_id, timeline_id); + let node_id = NodeId(1); + + let timeline_subscription_keys = [ + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::All, + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::TenantTimelines(tenant_id), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Timeline(id), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Node(id, node_kind), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Operation(id, node_kind, operation_kind), + }, + ]; + + let full_key_string = format!( + "{}/{node_id}", + timeline_subscription_keys.last().unwrap().watch_key() + ); + + for key in timeline_subscription_keys { + assert!(full_key_string.starts_with(&key.watch_key()), "Full key '{full_key_string}' should start with any of the keys, keys, but {key:?} did not match"); + } + + let full_key = SubscriptionFullKey::from_str(&full_key_string).unwrap_or_else(|e| { + panic!("Failed to parse {full_key_string} as a subscription full key: {e}") + }); + + assert_eq!( + full_key, + SubscriptionFullKey { + id, + node_kind, + operation: operation_kind, + node_id + } + ) + } +} diff --git a/libs/etcd_broker/src/subscription_value.rs b/libs/etcd_broker/src/subscription_value.rs new file mode 100644 index 0000000000..d3e2011761 --- /dev/null +++ b/libs/etcd_broker/src/subscription_value.rs @@ -0,0 +1,35 @@ +//! Module for the values to put into etcd. + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use utils::lsn::Lsn; + +/// Data about safekeeper's timeline. Fields made optional for easy migrations. +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SkTimelineInfo { + /// Term of the last entry. + pub last_log_term: Option, + /// LSN of the last record. + #[serde_as(as = "Option")] + #[serde(default)] + pub flush_lsn: Option, + /// Up to which LSN safekeeper regards its WAL as committed. + #[serde_as(as = "Option")] + #[serde(default)] + pub commit_lsn: Option, + /// LSN up to which safekeeper has backed WAL. + #[serde_as(as = "Option")] + #[serde(default)] + pub backup_lsn: Option, + /// LSN of last checkpoint uploaded by pageserver. + #[serde_as(as = "Option")] + #[serde(default)] + pub remote_consistent_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub peer_horizon_lsn: Option, + /// A connection string to use for WAL receiving. + #[serde(default)] + pub safekeeper_connstr: Option, +} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 202a13545d..32bd88cf7c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -50,7 +50,10 @@ use crate::thread_mgr::ThreadKind; use crate::{thread_mgr, DatadirTimelineImpl}; use anyhow::{ensure, Context}; use chrono::{NaiveDateTime, Utc}; -use etcd_broker::{Client, SkTimelineInfo, SkTimelineSubscription, SkTimelineSubscriptionKind}; +use etcd_broker::{ + subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, + Client, +}; use itertools::Itertools; use once_cell::sync::Lazy; use std::cell::Cell; @@ -403,7 +406,7 @@ async fn timeline_wal_broker_loop_step( // Endlessly try to subscribe for broker updates for a given timeline. // If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly. // This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. - let mut broker_subscription: SkTimelineSubscription; + let mut broker_subscription: BrokerSubscription; let mut attempt = 0; loop { select! { @@ -420,9 +423,9 @@ async fn timeline_wal_broker_loop_step( info!("Broker subscription loop cancelled, shutting down"); return Ok(ControlFlow::Break(())); }, - new_subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + new_subscription = etcd_broker::subscribe_for_json_values( etcd_client, - SkTimelineSubscriptionKind::timeline(broker_prefix.to_owned(), id), + SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), ) .instrument(info_span!("etcd_subscription")) => match new_subscription { Ok(new_subscription) => { diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3d75fec587..169b106aa9 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -4,9 +4,7 @@ use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use etcd_broker::Client; -use etcd_broker::PutOptions; -use etcd_broker::SkTimelineSubscriptionKind; +use etcd_broker::subscription_value::SkTimelineInfo; use std::time::Duration; use tokio::spawn; use tokio::task::JoinHandle; @@ -15,6 +13,10 @@ use tracing::*; use url::Url; use crate::{timeline::GlobalTimelines, SafeKeeperConf}; +use etcd_broker::{ + subscription_key::{OperationKind, SkOperationKind, SubscriptionKey}, + Client, PutOptions, +}; use utils::zid::{NodeId, ZTenantTimelineId}; const RETRY_INTERVAL_MSEC: u64 = 1000; @@ -43,7 +45,7 @@ fn timeline_safekeeper_path( ) -> String { format!( "{}/{sk_id}", - SkTimelineSubscriptionKind::timeline(broker_etcd_prefix, zttid).watch_key() + SubscriptionKey::sk_timeline_info(broker_etcd_prefix, zttid).watch_key() ) } @@ -148,14 +150,6 @@ async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { } } -pub fn get_campaign_name( - election_name: &str, - broker_prefix: &str, - id: ZTenantTimelineId, -) -> String { - format!("{broker_prefix}/{id}/{election_name}") -} - pub fn get_candiate_name(system_id: NodeId) -> String { format!("id_{system_id}") } @@ -209,9 +203,20 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { let mut client = Client::connect(&conf.broker_endpoints, None).await?; - let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + let mut subscription = etcd_broker::subscribe_for_values( &mut client, - SkTimelineSubscriptionKind::all(conf.broker_etcd_prefix.clone()), + SubscriptionKey::all(conf.broker_etcd_prefix.clone()), + |full_key, value_str| { + if full_key.operation == OperationKind::Safekeeper(SkOperationKind::TimelineInfo) { + match serde_json::from_str::(value_str) { + Ok(new_info) => return Some(new_info), + Err(e) => { + error!("Failed to parse timeline info from value str '{value_str}': {e}") + } + } + } + None + }, ) .await .context("failed to subscribe for safekeeper info")?; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b0197a9a2a..73b9024c7d 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,4 +1,3 @@ -use etcd_broker::SkTimelineInfo; use hyper::{Body, Request, Response, StatusCode}; use serde::Serialize; @@ -11,6 +10,7 @@ use crate::safekeeper::Term; use crate::safekeeper::TermHistory; use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult}; use crate::SafeKeeperConf; +use etcd_broker::subscription_value::SkTimelineInfo; use utils::{ http::{ endpoint, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 1c00af7043..eb6316dec2 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use etcd_broker::SkTimelineInfo; +use etcd_broker::subscription_value::SkTimelineInfo; use postgres_ffi::xlog_utils::TimeLineID; use postgres_ffi::xlog_utils::XLogSegNo; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 30c94f2543..a69dadb7bb 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; -use etcd_broker::SkTimelineInfo; +use etcd_broker::subscription_value::SkTimelineInfo; use lazy_static::lazy_static; use postgres_ffi::xlog_utils::XLogSegNo; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 1f2e9c303a..08e19f3f2f 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,4 +1,7 @@ use anyhow::{Context, Result}; +use etcd_broker::subscription_key::{ + NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, +}; use tokio::task::JoinHandle; use std::cmp::min; @@ -26,8 +29,6 @@ use crate::{broker, SafeKeeperConf}; use once_cell::sync::OnceCell; -const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP"; - const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; @@ -98,11 +99,15 @@ async fn wal_backup_launcher_main_loop( info!("starting WAL backup task for {}", zttid); // TODO: decide who should offload in launcher itself by simply checking current state - let election_name = broker::get_campaign_name( - BACKUP_ELECTION_NAME, - &conf.broker_etcd_prefix, - zttid, - ); + let election_name = SubscriptionKey { + cluster_prefix: conf.broker_etcd_prefix.clone(), + kind: SubscriptionKind::Operation( + zttid, + NodeKind::Safekeeper, + OperationKind::Safekeeper(SkOperationKind::WalBackup), + ), + } + .watch_key(); let my_candidate_name = broker::get_candiate_name(conf.my_id); let election = broker::Election::new( election_name,