diff --git a/Cargo.lock b/Cargo.lock index 3c38dc8150..ac40a2931f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,9 +48,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.53" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" +checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" dependencies = [ "backtrace", ] @@ -113,6 +113,49 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4af7447fc1214c1f3a1ace861d0216a6c8bb13965b64bbad9650f375b67689a" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.1", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bdc19781b16e32f8a7200368a336fa4509d4b72ef15dd4e41df5290855ee1e6" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -320,6 +363,15 @@ dependencies = [ "textwrap 0.14.2", ] +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", +] + [[package]] name = "combine" version = "4.6.3" @@ -730,9 +782,9 @@ dependencies = [ [[package]] name = "etcd-client" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585de5039d1ecce74773db49ba4e8107e42be7c2cd0b1a9e7fce27181db7b118" +checksum = "c434d2800b273a506b82397aad2f20971636f65e47b27c027f77d498530c5954" dependencies = [ "http", "prost", @@ -740,9 +792,26 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tower", "tower-service", ] +[[package]] +name = "etcd_broker" +version = "0.1.0" +dependencies = [ + "etcd-client", + "regex", + "serde", + "serde_json", + "serde_with", + "thiserror", + "tokio", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "fail" version = "0.5.0" @@ -1027,6 +1096,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1092,6 +1167,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.6.0" @@ -1357,6 +1438,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "md-5" version = "0.9.1" @@ -1613,9 +1700,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "oorandom" @@ -1976,6 +2063,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e07e3a46d0771a8a06b5f4441527802830b43e679ba12f44960f48dd4c6803" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -2007,9 +2104,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" +checksum = "a07b0857a71a8cb765763950499cae2413c3f9cede1133478c43600d9e146890" dependencies = [ "bytes", "prost-derive", @@ -2017,12 +2114,14 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" +checksum = "120fbe7988713f39d780a58cf1a7ef0d7ef66c6d87e5aa3438940c05357929f4" dependencies = [ "bytes", - "heck", + "cfg-if", + "cmake", + "heck 0.4.0", "itertools", "lazy_static", "log", @@ -2037,9 +2136,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" +checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc" dependencies = [ "anyhow", "itertools", @@ -2050,9 +2149,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" +checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" dependencies = [ "bytes", "prost", @@ -2224,9 +2323,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -2501,7 +2600,7 @@ dependencies = [ "const_format", "crc32c", "daemonize", - "etcd-client", + "etcd_broker", "fs2", "hex", "humantime", @@ -2830,7 +2929,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "rustversion", @@ -2868,15 +2967,21 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.86" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52" dependencies = [ "proc-macro2", "quote", "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tar" version = "0.4.38" @@ -3170,12 +3275,13 @@ dependencies = [ [[package]] name = "tonic" -version = "0.6.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a" +checksum = "30fb54bf1e446f44d870d260d99957e7d11fb9d0a0f5bd1a662ad1411cc103f9" dependencies = [ "async-stream", "async-trait", + "axum", "base64", "bytes", "futures-core", @@ -3191,7 +3297,7 @@ dependencies = [ "prost-derive", "tokio", "tokio-stream", - "tokio-util 0.6.9", + "tokio-util 0.7.0", "tower", "tower-layer", "tower-service", @@ -3201,10 +3307,11 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.6.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757" +checksum = "c03447cdc9eaf8feffb6412dcb27baf2db11669a6c4789f29da799aabfb99547" dependencies = [ + "prettyplease", "proc-macro2", "prost-build", "quote", @@ -3231,6 +3338,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e980386f06883cf4d0578d6c9178c81f68b45d77d00f2c2c1bc034b3439c2c56" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.1" @@ -3672,13 +3798,16 @@ dependencies = [ name = "workspace_hack" version = "0.1.0" dependencies = [ + "ahash", "anyhow", "bytes", "chrono", "clap 2.34.0", "either", + "fail", "hashbrown", "indexmap", + "itoa 0.4.8", "libc", "log", "memchr", @@ -3692,6 +3821,7 @@ dependencies = [ "serde", "syn", "tokio", + "tokio-util 0.7.0", "tracing", "tracing-core", ] diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 12ee88cdc9..5aeff505b6 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -63,6 +63,10 @@ pub struct LocalEnv { #[serde(default)] pub broker_endpoints: Option, + /// A prefix to all to any key when pushing/polling etcd from a node. + #[serde(default)] + pub broker_etcd_prefix: Option, + pub pageserver: PageServerConf, #[serde(default)] diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index b094016131..074ee72f69 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -77,6 +77,7 @@ pub struct SafekeeperNode { pub pageserver: Arc, broker_endpoints: Option, + broker_etcd_prefix: Option, } impl SafekeeperNode { @@ -94,6 +95,7 @@ impl SafekeeperNode { http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), pageserver, broker_endpoints: env.broker_endpoints.clone(), + broker_etcd_prefix: env.broker_etcd_prefix.clone(), } } @@ -143,6 +145,9 @@ impl SafekeeperNode { if let Some(ref ep) = self.broker_endpoints { cmd.args(&["--broker-endpoints", ep]); } + if let Some(prefix) = self.broker_etcd_prefix.as_deref() { + cmd.args(&["--broker-etcd-prefix", prefix]); + } if !cmd.status()?.success() { bail!( diff --git a/libs/etcd_broker/Cargo.toml b/libs/etcd_broker/Cargo.toml new file mode 100644 index 0000000000..65bd406131 --- /dev/null +++ b/libs/etcd_broker/Cargo.toml @@ -0,0 +1,17 @@ +[package] + name = "etcd_broker" + version = "0.1.0" + edition = "2021" + + [dependencies] + etcd-client = "0.9.0" + regex = "1.4.5" + serde = { version = "1.0", features = ["derive"] } + serde_json = "1" + serde_with = "1.12.0" + + utils = { path = "../utils" } + workspace_hack = { version = "0.1", path = "../../workspace_hack" } + tokio = "1" + tracing = "0.1" + thiserror = "1" diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs new file mode 100644 index 0000000000..01cc0cf162 --- /dev/null +++ b/libs/etcd_broker/src/lib.rs @@ -0,0 +1,335 @@ +//! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent). +//! Intended to connect services to each other, not to store their data. +use std::{ + collections::{hash_map, HashMap}, + fmt::Display, + str::FromStr, +}; + +use regex::{Captures, Regex}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; + +pub use etcd_client::*; + +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::*; +use utils::{ + lsn::Lsn, + zid::{ZNodeId, ZTenantId, ZTenantTimelineId}, +}; + +#[derive(Debug, Deserialize, Serialize)] +struct SafekeeperTimeline { + safekeeper_id: ZNodeId, + info: SkTimelineInfo, +} + +/// Published data about safekeeper's timeline. Fields made optional for easy migrations. +#[serde_as] +#[derive(Debug, Deserialize, Serialize)] +pub struct SkTimelineInfo { + /// Term of the last entry. + pub last_log_term: Option, + /// LSN of the last record. + #[serde_as(as = "Option")] + #[serde(default)] + pub flush_lsn: Option, + /// Up to which LSN safekeeper regards its WAL as committed. + #[serde_as(as = "Option")] + #[serde(default)] + pub commit_lsn: Option, + /// LSN up to which safekeeper offloaded WAL to s3. + #[serde_as(as = "Option")] + #[serde(default)] + pub s3_wal_lsn: Option, + /// LSN of last checkpoint uploaded by pageserver. + #[serde_as(as = "Option")] + #[serde(default)] + pub remote_consistent_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub peer_horizon_lsn: Option, + #[serde(default)] + pub wal_stream_connection_string: Option, +} + +#[derive(Debug, thiserror::Error)] +pub enum BrokerError { + #[error("Etcd client error: {0}. Context: {1}")] + EtcdClient(etcd_client::Error, String), + #[error("Error during parsing etcd data: {0}")] + ParsingError(String), + #[error("Internal error: {0}")] + InternalError(String), +} + +/// A way to control the data retrieval from a certain subscription. +pub struct SkTimelineSubscription { + safekeeper_timeline_updates: + mpsc::UnboundedReceiver>>, + kind: SkTimelineSubscriptionKind, + watcher_handle: JoinHandle>, + watcher: Watcher, +} + +impl SkTimelineSubscription { + /// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet. + pub async fn fetch_data( + &mut self, + ) -> Option>> { + self.safekeeper_timeline_updates.recv().await + } + + /// Cancels the subscription, stopping the data poller and waiting for it to shut down. + pub async fn cancel(mut self) -> Result<(), BrokerError> { + self.watcher.cancel().await.map_err(|e| { + BrokerError::EtcdClient( + e, + format!( + "Failed to cancel timeline subscription, kind: {:?}", + self.kind + ), + ) + })?; + self.watcher_handle.await.map_err(|e| { + BrokerError::InternalError(format!( + "Failed to join the timeline updates task, kind: {:?}, error: {e}", + self.kind + )) + })? + } +} + +/// The subscription kind to the timeline updates from safekeeper. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SkTimelineSubscriptionKind { + broker_prefix: String, + kind: SubscriptionKind, +} + +impl SkTimelineSubscriptionKind { + pub fn all(broker_prefix: String) -> Self { + Self { + broker_prefix, + kind: SubscriptionKind::All, + } + } + + pub fn tenant(broker_prefix: String, tenant: ZTenantId) -> Self { + Self { + broker_prefix, + kind: SubscriptionKind::Tenant(tenant), + } + } + + pub fn timeline(broker_prefix: String, timeline: ZTenantTimelineId) -> Self { + Self { + broker_prefix, + kind: SubscriptionKind::Timeline(timeline), + } + } + + fn watch_regex(&self) -> Regex { + match self.kind { + SubscriptionKind::All => Regex::new(&format!( + r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", + self.broker_prefix + )) + .expect("wrong regex for 'everything' subscription"), + SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!( + r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", + self.broker_prefix + )) + .expect("wrong regex for 'tenant' subscription"), + SubscriptionKind::Timeline(ZTenantTimelineId { + tenant_id, + timeline_id, + }) => Regex::new(&format!( + r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$", + self.broker_prefix + )) + .expect("wrong regex for 'timeline' subscription"), + } + } + + /// Etcd key to use for watching a certain timeline updates from safekeepers. + pub fn watch_key(&self) -> String { + match self.kind { + SubscriptionKind::All => self.broker_prefix.to_string(), + SubscriptionKind::Tenant(tenant_id) => { + format!("{}/{tenant_id}/safekeeper", self.broker_prefix) + } + SubscriptionKind::Timeline(ZTenantTimelineId { + tenant_id, + timeline_id, + }) => format!( + "{}/{tenant_id}/{timeline_id}/safekeeper", + self.broker_prefix + ), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum SubscriptionKind { + /// Get every timeline update. + All, + /// Get certain tenant timelines' updates. + Tenant(ZTenantId), + /// Get certain timeline updates. + Timeline(ZTenantTimelineId), +} + +/// Creates a background task to poll etcd for timeline updates from safekeepers. +/// Stops and returns `Err` on any error during etcd communication. +/// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle, +/// exiting normally in such cases. +pub async fn subscribe_to_safekeeper_timeline_updates( + client: &mut Client, + subscription: SkTimelineSubscriptionKind, +) -> Result { + info!("Subscribing to timeline updates, subscription kind: {subscription:?}"); + + let (watcher, mut stream) = client + .watch( + subscription.watch_key(), + Some(WatchOptions::new().with_prefix()), + ) + .await + .map_err(|e| { + BrokerError::EtcdClient( + e, + format!("Failed to init the watch for subscription {subscription:?}"), + ) + })?; + + let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel(); + + let subscription_kind = subscription.kind; + let regex = subscription.watch_regex(); + let watcher_handle = tokio::spawn(async move { + while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!( + "Failed to get messages from the subscription stream, kind: {subscription_kind:?}, error: {e}" + )))? { + if resp.canceled() { + info!("Watch for timeline updates subscription was canceled, exiting"); + break; + } + + let mut timeline_updates: HashMap> = + HashMap::new(); + + let events = resp.events(); + debug!("Processing {} events", events.len()); + + for event in events { + if EventType::Put == event.event_type() { + if let Some(kv) = event.kv() { + match parse_etcd_key_value(subscription_kind, ®ex, kv) { + Ok(Some((zttid, timeline))) => { + match timeline_updates + .entry(zttid) + .or_default() + .entry(timeline.safekeeper_id) + { + hash_map::Entry::Occupied(mut o) => { + if o.get().flush_lsn < timeline.info.flush_lsn { + o.insert(timeline.info); + } + } + hash_map::Entry::Vacant(v) => { + v.insert(timeline.info); + } + } + } + Ok(None) => {} + Err(e) => error!("Failed to parse timeline update: {e}"), + }; + } + } + } + + if let Err(e) = timeline_updates_sender.send(timeline_updates) { + info!("Timeline updates sender got dropped, exiting: {e}"); + break; + } + } + + Ok(()) + }); + + Ok(SkTimelineSubscription { + kind: subscription, + safekeeper_timeline_updates, + watcher_handle, + watcher, + }) +} + +fn parse_etcd_key_value( + subscription_kind: SubscriptionKind, + regex: &Regex, + kv: &KeyValue, +) -> Result, BrokerError> { + let caps = if let Some(caps) = regex.captures(kv.key_str().map_err(|e| { + BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as key str")) + })?) { + caps + } else { + return Ok(None); + }; + + let (zttid, safekeeper_id) = match subscription_kind { + SubscriptionKind::All => ( + ZTenantTimelineId::new( + parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?, + parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?, + ), + ZNodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?), + ), + SubscriptionKind::Tenant(tenant_id) => ( + ZTenantTimelineId::new( + tenant_id, + parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?, + ), + ZNodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?), + ), + SubscriptionKind::Timeline(zttid) => ( + zttid, + ZNodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?), + ), + }; + + let info_str = kv.value_str().map_err(|e| { + BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as value str")) + })?; + Ok(Some(( + zttid, + SafekeeperTimeline { + safekeeper_id, + info: serde_json::from_str(info_str).map_err(|e| { + BrokerError::ParsingError(format!( + "Failed to parse '{info_str}' as safekeeper timeline info: {e}" + )) + })?, + }, + ))) +} + +fn parse_capture(caps: &Captures, index: usize) -> Result +where + T: FromStr, + ::Err: Display, +{ + let capture_match = caps + .get(index) + .ok_or_else(|| format!("Failed to get capture match at index {index}"))? + .as_str(); + capture_match.parse().map_err(|e| { + format!( + "Failed to parse {} from {capture_match}: {e}", + std::any::type_name::() + ) + }) +} diff --git a/libs/utils/src/zid.rs b/libs/utils/src/zid.rs index fce5ed97c1..44d81cda50 100644 --- a/libs/utils/src/zid.rs +++ b/libs/utils/src/zid.rs @@ -224,7 +224,7 @@ impl fmt::Display for ZTenantTimelineId { // Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued // by the console. -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)] #[serde(transparent)] pub struct ZNodeId(pub u64); diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 158e43f68f..8b54054080 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -517,7 +517,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { .collect() } -fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { +fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> { let pageserver = PageServerNode::from_env(env); match tenant_match.subcommand() { Some(("list", _)) => { @@ -550,17 +550,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re pageserver .tenant_config(tenant_id, tenant_conf) - .unwrap_or_else(|e| { - anyhow!( - "Tenant config failed for tenant with id {} : {}", - tenant_id, - e - ); - }); - println!( - "tenant {} successfully configured on the pageserver", - tenant_id - ); + .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?; + println!("tenant {tenant_id} successfully configured on the pageserver"); } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 8a31311b8f..44587dd384 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -24,11 +24,10 @@ walkdir = "2" url = "2.2.2" signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } -serde_with = {version = "1.12.0"} +serde_with = "1.12.0" hex = "0.4.3" const_format = "0.2.21" tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -etcd-client = "0.8.3" tokio-util = { version = "0.7", features = ["io"] } rusoto_core = "0.47" rusoto_s3 = "0.47" @@ -36,6 +35,7 @@ rusoto_s3 = "0.47" postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } +etcd_broker = { path = "../libs/etcd_broker" } workspace_hack = { version = "0.1", path = "../workspace_hack" } [dev-dependencies] diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 3fea3581a8..7e979840c2 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -109,6 +109,12 @@ fn main() -> Result<()> { .takes_value(true) .help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"), ) + .arg( + Arg::new("broker-etcd-prefix") + .long("broker-etcd-prefix") + .takes_value(true) + .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -118,7 +124,7 @@ fn main() -> Result<()> { return Ok(()); } - let mut conf: SafeKeeperConf = Default::default(); + let mut conf = SafeKeeperConf::default(); if let Some(dir) = arg_matches.value_of("datadir") { // change into the data directory. @@ -162,6 +168,9 @@ fn main() -> Result<()> { let collected_ep: Result, ParseError> = addr.split(',').map(Url::parse).collect(); conf.broker_endpoints = Some(collected_ep?); } + if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") { + conf.broker_etcd_prefix = prefix.to_string(); + } start_safekeeper(conf, given_id, arg_matches.is_present("init")) } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 8ce7bdf0e5..c9ae1a8d98 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -1,61 +1,22 @@ //! Communication with etcd, providing safekeeper peers and pageserver coordination. -use anyhow::bail; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use etcd_client::Client; -use etcd_client::EventType; -use etcd_client::PutOptions; -use etcd_client::WatchOptions; -use lazy_static::lazy_static; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; -use std::str::FromStr; +use etcd_broker::Client; +use etcd_broker::PutOptions; +use etcd_broker::SkTimelineSubscriptionKind; use std::time::Duration; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; -use crate::{safekeeper::Term, timeline::GlobalTimelines, SafeKeeperConf}; -use utils::{ - lsn::Lsn, - zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, -}; +use crate::{timeline::GlobalTimelines, SafeKeeperConf}; +use utils::zid::{ZNodeId, ZTenantTimelineId}; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; const LEASE_TTL_SEC: i64 = 5; -// TODO: add global zenith installation ID. -const ZENITH_PREFIX: &str = "zenith"; - -/// Published data about safekeeper. Fields made optional for easy migrations. -#[serde_as] -#[derive(Debug, Deserialize, Serialize)] -pub struct SafekeeperInfo { - /// Term of the last entry. - pub last_log_term: Option, - /// LSN of the last record. - #[serde_as(as = "Option")] - #[serde(default)] - pub flush_lsn: Option, - /// Up to which LSN safekeeper regards its WAL as committed. - #[serde_as(as = "Option")] - #[serde(default)] - pub commit_lsn: Option, - /// LSN up to which safekeeper offloaded WAL to s3. - #[serde_as(as = "Option")] - #[serde(default)] - pub s3_wal_lsn: Option, - /// LSN of last checkpoint uploaded by pageserver. - #[serde_as(as = "Option")] - #[serde(default)] - pub remote_consistent_lsn: Option, - #[serde_as(as = "Option")] - #[serde(default)] - pub peer_horizon_lsn: Option, -} pub fn thread_main(conf: SafeKeeperConf) { let runtime = runtime::Builder::new_current_thread() @@ -71,22 +32,21 @@ pub fn thread_main(conf: SafeKeeperConf) { }); } -/// Prefix to timeline related data. -fn timeline_path(zttid: &ZTenantTimelineId) -> String { +/// Key to per timeline per safekeeper data. +fn timeline_safekeeper_path( + broker_prefix: String, + zttid: ZTenantTimelineId, + sk_id: ZNodeId, +) -> String { format!( - "{}/{}/{}", - ZENITH_PREFIX, zttid.tenant_id, zttid.timeline_id + "{}/{sk_id}", + SkTimelineSubscriptionKind::timeline(broker_prefix, zttid).watch_key() ) } -/// Key to per timeline per safekeeper data. -fn timeline_safekeeper_path(zttid: &ZTenantTimelineId, sk_id: ZNodeId) -> String { - format!("{}/safekeeper/{}", timeline_path(zttid), sk_id) -} - /// Push once in a while data about all active timelines to the broker. -async fn push_loop(conf: SafeKeeperConf) -> Result<()> { - let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?; +async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { + let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?; // Get and maintain lease to automatically delete obsolete data let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; @@ -98,14 +58,17 @@ async fn push_loop(conf: SafeKeeperConf) -> Result<()> { // is under plain mutex. That's ok, all this code is not performance // sensitive and there is no risk of deadlock as we don't await while // lock is held. - let active_tlis = GlobalTimelines::get_active_timelines(); - for zttid in &active_tlis { - if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { - let sk_info = tli.get_public_info(); + for zttid in GlobalTimelines::get_active_timelines() { + if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { + let sk_info = tli.get_public_info()?; let put_opts = PutOptions::new().with_lease(lease.id()); client .put( - timeline_safekeeper_path(zttid, conf.my_id), + timeline_safekeeper_path( + conf.broker_etcd_prefix.clone(), + zttid, + conf.my_id, + ), serde_json::to_string(&sk_info)?, Some(put_opts), ) @@ -128,45 +91,31 @@ async fn push_loop(conf: SafeKeeperConf) -> Result<()> { /// Subscribe and fetch all the interesting data from the broker. async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { - lazy_static! { - static ref TIMELINE_SAFEKEEPER_RE: Regex = - Regex::new(r"^zenith/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$") - .unwrap(); - } - let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?; - loop { - let wo = WatchOptions::new().with_prefix(); - // TODO: subscribe only to my timelines - let (_, mut stream) = client.watch(ZENITH_PREFIX, Some(wo)).await?; - while let Some(resp) = stream.message().await? { - if resp.canceled() { - bail!("watch canceled"); - } + let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?; - for event in resp.events() { - if EventType::Put == event.event_type() { - if let Some(kv) = event.kv() { - if let Some(caps) = TIMELINE_SAFEKEEPER_RE.captures(kv.key_str()?) { - let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let zttid = ZTenantTimelineId::new(tenant_id, timeline_id); - let safekeeper_id = ZNodeId(caps.get(3).unwrap().as_str().parse()?); - let value_str = kv.value_str()?; - match serde_json::from_str::(value_str) { - Ok(safekeeper_info) => { - if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { - tli.record_safekeeper_info(&safekeeper_info, safekeeper_id)? - } - } - Err(err) => warn!( - "failed to deserialize safekeeper info {}: {}", - value_str, err - ), - } + let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + &mut client, + SkTimelineSubscriptionKind::all(conf.broker_etcd_prefix.clone()), + ) + .await + .context("failed to subscribe for safekeeper info")?; + + loop { + match subscription.fetch_data().await { + Some(new_info) => { + for (zttid, sk_info) in new_info { + // note: there are blocking operations below, but it's considered fine for now + if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { + for (safekeeper_id, info) in sk_info { + tli.record_safekeeper_info(&info, safekeeper_id)? } } } } + None => { + debug!("timeline updates sender closed, aborting the pull loop"); + return Ok(()); + } } } } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index d7cbcb094e..e731db5617 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,3 +1,4 @@ +use etcd_broker::SkTimelineInfo; use hyper::{Body, Request, Response, StatusCode}; use serde::Serialize; @@ -5,7 +6,6 @@ use serde::Serializer; use std::fmt::Display; use std::sync::Arc; -use crate::broker::SafekeeperInfo; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; use crate::timeline::GlobalTimelines; @@ -136,7 +136,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result>, + pub broker_etcd_prefix: String, } impl SafeKeeperConf { @@ -76,6 +78,7 @@ impl Default for SafeKeeperConf { recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: ZNodeId(0), broker_endpoints: None, + broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(), } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 68361fd672..b9264565dc 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use etcd_broker::SkTimelineInfo; use postgres_ffi::xlog_utils::TimeLineID; use postgres_ffi::xlog_utils::XLogSegNo; @@ -16,7 +17,6 @@ use tracing::*; use lazy_static::lazy_static; -use crate::broker::SafekeeperInfo; use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; @@ -886,7 +886,7 @@ where } /// Update timeline state with peer safekeeper data. - pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperInfo) -> Result<()> { + pub fn record_safekeeper_info(&mut self, sk_info: &SkTimelineInfo) -> Result<()> { let mut sync_control_file = false; if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term) { diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 47137091da..140d6660ac 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -3,6 +3,7 @@ use anyhow::{bail, Context, Result}; +use etcd_broker::SkTimelineInfo; use lazy_static::lazy_static; use postgres_ffi::xlog_utils::XLogSegNo; @@ -21,7 +22,6 @@ use utils::{ zid::{ZNodeId, ZTenantTimelineId}, }; -use crate::broker::SafekeeperInfo; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::control_file; @@ -89,6 +89,7 @@ struct SharedState { active: bool, num_computes: u32, pageserver_connstr: Option, + listen_pg_addr: String, last_removed_segno: XLogSegNo, } @@ -111,6 +112,7 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, + listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -130,6 +132,7 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, + listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -418,9 +421,9 @@ impl Timeline { } /// Prepare public safekeeper info for reporting. - pub fn get_public_info(&self) -> SafekeeperInfo { + pub fn get_public_info(&self) -> anyhow::Result { let shared_state = self.mutex.lock().unwrap(); - SafekeeperInfo { + Ok(SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), // note: this value is not flushed to control file yet and can be lost @@ -432,11 +435,23 @@ impl Timeline { shared_state.sk.inmem.remote_consistent_lsn, )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - } + wal_stream_connection_string: shared_state + .pageserver_connstr + .as_deref() + .map(|pageserver_connstr| { + wal_stream_connection_string( + self.zttid, + &shared_state.listen_pg_addr, + pageserver_connstr, + ) + }) + .transpose() + .context("Failed to get the pageserver callmemaybe connstr")?, + }) } /// Update timeline state with peer safekeeper data. - pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> { + pub fn record_safekeeper_info(&self, sk_info: &SkTimelineInfo, _sk_id: ZNodeId) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); shared_state.sk.record_safekeeper_info(sk_info)?; self.notify_wal_senders(&mut shared_state); @@ -489,6 +504,29 @@ impl Timeline { } } +// pageserver connstr is needed to be able to distinguish between different pageservers +// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved +// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105 +fn wal_stream_connection_string( + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + listen_pg_addr_str: &str, + pageserver_connstr: &str, +) -> anyhow::Result { + let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str); + let me_conf = me_connstr + .parse::() + .with_context(|| { + format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one") + })?; + let (host, port) = utils::connstring::connection_host_port(&me_conf); + Ok(format!( + "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'", + )) +} + // Utilities needed by various Connection-like objects pub trait TimelineTools { fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>; diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index f178b5b766..2bb22f2d3b 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -14,29 +14,34 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] +ahash = { version = "0.7", features = ["std"] } anyhow = { version = "1", features = ["backtrace", "std"] } bytes = { version = "1", features = ["serde", "std"] } chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] } clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] } either = { version = "1", features = ["use_std"] } +fail = { version = "0.5", default-features = false, features = ["failpoints"] } hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } +itoa = { version = "0.4", features = ["i128", "std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } log = { version = "0.4", default-features = false, features = ["serde", "std"] } memchr = { version = "2", features = ["std", "use_std"] } num-integer = { version = "0.1", default-features = false, features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "std"] } -prost = { version = "0.9", features = ["prost-derive", "std"] } +prost = { version = "0.10", features = ["prost-derive", "std"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } scopeguard = { version = "1", features = ["use_std"] } serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] } tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] } +tokio-util = { version = "0.7", features = ["codec", "io"] } tracing = { version = "0.1", features = ["attributes", "log", "std", "tracing-attributes"] } tracing-core = { version = "0.1", features = ["lazy_static", "std"] } [build-dependencies] +ahash = { version = "0.7", features = ["std"] } anyhow = { version = "1", features = ["backtrace", "std"] } bytes = { version = "1", features = ["serde", "std"] } clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] } @@ -46,7 +51,7 @@ indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } log = { version = "0.4", default-features = false, features = ["serde", "std"] } memchr = { version = "2", features = ["std", "use_std"] } -prost = { version = "0.9", features = ["prost-derive", "std"] } +prost = { version = "0.10", features = ["prost-derive", "std"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }