Librarify common etcd timeline logic

This commit is contained in:
Kirill Bulatov
2022-05-04 18:28:46 +03:00
committed by Kirill Bulatov
parent dd6dca9072
commit d4e155aaa3
15 changed files with 633 additions and 147 deletions

182
Cargo.lock generated
View File

@@ -48,9 +48,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.53"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
dependencies = [
"backtrace",
]
@@ -113,6 +113,49 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4af7447fc1214c1f3a1ace861d0216a6c8bb13965b64bbad9650f375b67689a"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa 1.0.1",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"sync_wrapper",
"tokio",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bdc19781b16e32f8a7200368a336fa4509d4b72ef15dd4e41df5290855ee1e6"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
]
[[package]]
name = "backtrace"
version = "0.3.64"
@@ -320,6 +363,15 @@ dependencies = [
"textwrap 0.14.2",
]
[[package]]
name = "cmake"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a"
dependencies = [
"cc",
]
[[package]]
name = "combine"
version = "4.6.3"
@@ -730,9 +782,9 @@ dependencies = [
[[package]]
name = "etcd-client"
version = "0.8.4"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585de5039d1ecce74773db49ba4e8107e42be7c2cd0b1a9e7fce27181db7b118"
checksum = "c434d2800b273a506b82397aad2f20971636f65e47b27c027f77d498530c5954"
dependencies = [
"http",
"prost",
@@ -740,9 +792,26 @@ dependencies = [
"tokio-stream",
"tonic",
"tonic-build",
"tower",
"tower-service",
]
[[package]]
name = "etcd_broker"
version = "0.1.0"
dependencies = [
"etcd-client",
"regex",
"serde",
"serde_json",
"serde_with",
"thiserror",
"tokio",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "fail"
version = "0.5.0"
@@ -1027,6 +1096,12 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1092,6 +1167,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]]
name = "httparse"
version = "1.6.0"
@@ -1357,6 +1438,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matchit"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "md-5"
version = "0.9.1"
@@ -1613,9 +1700,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "oorandom"
@@ -1976,6 +2063,16 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "prettyplease"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9e07e3a46d0771a8a06b5f4441527802830b43e679ba12f44960f48dd4c6803"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -2007,9 +2104,9 @@ dependencies = [
[[package]]
name = "prost"
version = "0.9.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
checksum = "a07b0857a71a8cb765763950499cae2413c3f9cede1133478c43600d9e146890"
dependencies = [
"bytes",
"prost-derive",
@@ -2017,12 +2114,14 @@ dependencies = [
[[package]]
name = "prost-build"
version = "0.9.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
checksum = "120fbe7988713f39d780a58cf1a7ef0d7ef66c6d87e5aa3438940c05357929f4"
dependencies = [
"bytes",
"heck",
"cfg-if",
"cmake",
"heck 0.4.0",
"itertools",
"lazy_static",
"log",
@@ -2037,9 +2136,9 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.9.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc"
dependencies = [
"anyhow",
"itertools",
@@ -2050,9 +2149,9 @@ dependencies = [
[[package]]
name = "prost-types"
version = "0.9.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
@@ -2224,9 +2323,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
@@ -2501,7 +2600,7 @@ dependencies = [
"const_format",
"crc32c",
"daemonize",
"etcd-client",
"etcd_broker",
"fs2",
"hex",
"humantime",
@@ -2830,7 +2929,7 @@ version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38"
dependencies = [
"heck",
"heck 0.3.3",
"proc-macro2",
"quote",
"rustversion",
@@ -2868,15 +2967,21 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.86"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b"
checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tar"
version = "0.4.38"
@@ -3170,12 +3275,13 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.6.2"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
checksum = "30fb54bf1e446f44d870d260d99957e7d11fb9d0a0f5bd1a662ad1411cc103f9"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
@@ -3191,7 +3297,7 @@ dependencies = [
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util 0.6.9",
"tokio-util 0.7.0",
"tower",
"tower-layer",
"tower-service",
@@ -3201,10 +3307,11 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.6.2"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
checksum = "c03447cdc9eaf8feffb6412dcb27baf2db11669a6c4789f29da799aabfb99547"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
@@ -3231,6 +3338,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e980386f06883cf4d0578d6c9178c81f68b45d77d00f2c2c1bc034b3439c2c56"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
@@ -3672,13 +3798,16 @@ dependencies = [
name = "workspace_hack"
version = "0.1.0"
dependencies = [
"ahash",
"anyhow",
"bytes",
"chrono",
"clap 2.34.0",
"either",
"fail",
"hashbrown",
"indexmap",
"itoa 0.4.8",
"libc",
"log",
"memchr",
@@ -3692,6 +3821,7 @@ dependencies = [
"serde",
"syn",
"tokio",
"tokio-util 0.7.0",
"tracing",
"tracing-core",
]

View File

@@ -63,6 +63,10 @@ pub struct LocalEnv {
#[serde(default)]
pub broker_endpoints: Option<String>,
/// A prefix to all to any key when pushing/polling etcd from a node.
#[serde(default)]
pub broker_etcd_prefix: Option<String>,
pub pageserver: PageServerConf,
#[serde(default)]

View File

@@ -77,6 +77,7 @@ pub struct SafekeeperNode {
pub pageserver: Arc<PageServerNode>,
broker_endpoints: Option<String>,
broker_etcd_prefix: Option<String>,
}
impl SafekeeperNode {
@@ -94,6 +95,7 @@ impl SafekeeperNode {
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
pageserver,
broker_endpoints: env.broker_endpoints.clone(),
broker_etcd_prefix: env.broker_etcd_prefix.clone(),
}
}
@@ -143,6 +145,9 @@ impl SafekeeperNode {
if let Some(ref ep) = self.broker_endpoints {
cmd.args(&["--broker-endpoints", ep]);
}
if let Some(prefix) = self.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]);
}
if !cmd.status()?.success() {
bail!(

View File

@@ -0,0 +1,17 @@
[package]
name = "etcd_broker"
version = "0.1.0"
edition = "2021"
[dependencies]
etcd-client = "0.9.0"
regex = "1.4.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.12.0"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
tokio = "1"
tracing = "0.1"
thiserror = "1"

335
libs/etcd_broker/src/lib.rs Normal file
View File

@@ -0,0 +1,335 @@
//! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent).
//! Intended to connect services to each other, not to store their data.
use std::{
collections::{hash_map, HashMap},
fmt::Display,
str::FromStr,
};
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
pub use etcd_client::*;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::{
lsn::Lsn,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
};
#[derive(Debug, Deserialize, Serialize)]
struct SafekeeperTimeline {
safekeeper_id: ZNodeId,
info: SkTimelineInfo,
}
/// Published data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub s3_wal_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub wal_stream_connection_string: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
EtcdClient(etcd_client::Error, String),
#[error("Error during parsing etcd data: {0}")]
ParsingError(String),
#[error("Internal error: {0}")]
InternalError(String),
}
/// A way to control the data retrieval from a certain subscription.
pub struct SkTimelineSubscription {
safekeeper_timeline_updates:
mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>>>,
kind: SkTimelineSubscriptionKind,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl SkTimelineSubscription {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(
&mut self,
) -> Option<HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>>> {
self.safekeeper_timeline_updates.recv().await
}
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
pub async fn cancel(mut self) -> Result<(), BrokerError> {
self.watcher.cancel().await.map_err(|e| {
BrokerError::EtcdClient(
e,
format!(
"Failed to cancel timeline subscription, kind: {:?}",
self.kind
),
)
})?;
self.watcher_handle.await.map_err(|e| {
BrokerError::InternalError(format!(
"Failed to join the timeline updates task, kind: {:?}, error: {e}",
self.kind
))
})?
}
}
/// The subscription kind to the timeline updates from safekeeper.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SkTimelineSubscriptionKind {
broker_prefix: String,
kind: SubscriptionKind,
}
impl SkTimelineSubscriptionKind {
pub fn all(broker_prefix: String) -> Self {
Self {
broker_prefix,
kind: SubscriptionKind::All,
}
}
pub fn tenant(broker_prefix: String, tenant: ZTenantId) -> Self {
Self {
broker_prefix,
kind: SubscriptionKind::Tenant(tenant),
}
}
pub fn timeline(broker_prefix: String, timeline: ZTenantTimelineId) -> Self {
Self {
broker_prefix,
kind: SubscriptionKind::Timeline(timeline),
}
}
fn watch_regex(&self) -> Regex {
match self.kind {
SubscriptionKind::All => Regex::new(&format!(
r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
self.broker_prefix
))
.expect("wrong regex for 'everything' subscription"),
SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!(
r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
self.broker_prefix
))
.expect("wrong regex for 'tenant' subscription"),
SubscriptionKind::Timeline(ZTenantTimelineId {
tenant_id,
timeline_id,
}) => Regex::new(&format!(
r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$",
self.broker_prefix
))
.expect("wrong regex for 'timeline' subscription"),
}
}
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
match self.kind {
SubscriptionKind::All => self.broker_prefix.to_string(),
SubscriptionKind::Tenant(tenant_id) => {
format!("{}/{tenant_id}/safekeeper", self.broker_prefix)
}
SubscriptionKind::Timeline(ZTenantTimelineId {
tenant_id,
timeline_id,
}) => format!(
"{}/{tenant_id}/{timeline_id}/safekeeper",
self.broker_prefix
),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum SubscriptionKind {
/// Get every timeline update.
All,
/// Get certain tenant timelines' updates.
Tenant(ZTenantId),
/// Get certain timeline updates.
Timeline(ZTenantTimelineId),
}
/// Creates a background task to poll etcd for timeline updates from safekeepers.
/// Stops and returns `Err` on any error during etcd communication.
/// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle,
/// exiting normally in such cases.
pub async fn subscribe_to_safekeeper_timeline_updates(
client: &mut Client,
subscription: SkTimelineSubscriptionKind,
) -> Result<SkTimelineSubscription, BrokerError> {
info!("Subscribing to timeline updates, subscription kind: {subscription:?}");
let (watcher, mut stream) = client
.watch(
subscription.watch_key(),
Some(WatchOptions::new().with_prefix()),
)
.await
.map_err(|e| {
BrokerError::EtcdClient(
e,
format!("Failed to init the watch for subscription {subscription:?}"),
)
})?;
let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel();
let subscription_kind = subscription.kind;
let regex = subscription.watch_regex();
let watcher_handle = tokio::spawn(async move {
while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!(
"Failed to get messages from the subscription stream, kind: {subscription_kind:?}, error: {e}"
)))? {
if resp.canceled() {
info!("Watch for timeline updates subscription was canceled, exiting");
break;
}
let mut timeline_updates: HashMap<ZTenantTimelineId, HashMap<ZNodeId, SkTimelineInfo>> =
HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(kv) = event.kv() {
match parse_etcd_key_value(subscription_kind, &regex, kv) {
Ok(Some((zttid, timeline))) => {
match timeline_updates
.entry(zttid)
.or_default()
.entry(timeline.safekeeper_id)
{
hash_map::Entry::Occupied(mut o) => {
if o.get().flush_lsn < timeline.info.flush_lsn {
o.insert(timeline.info);
}
}
hash_map::Entry::Vacant(v) => {
v.insert(timeline.info);
}
}
}
Ok(None) => {}
Err(e) => error!("Failed to parse timeline update: {e}"),
};
}
}
}
if let Err(e) = timeline_updates_sender.send(timeline_updates) {
info!("Timeline updates sender got dropped, exiting: {e}");
break;
}
}
Ok(())
});
Ok(SkTimelineSubscription {
kind: subscription,
safekeeper_timeline_updates,
watcher_handle,
watcher,
})
}
fn parse_etcd_key_value(
subscription_kind: SubscriptionKind,
regex: &Regex,
kv: &KeyValue,
) -> Result<Option<(ZTenantTimelineId, SafekeeperTimeline)>, BrokerError> {
let caps = if let Some(caps) = regex.captures(kv.key_str().map_err(|e| {
BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as key str"))
})?) {
caps
} else {
return Ok(None);
};
let (zttid, safekeeper_id) = match subscription_kind {
SubscriptionKind::All => (
ZTenantTimelineId::new(
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?,
),
ZNodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Tenant(tenant_id) => (
ZTenantTimelineId::new(
tenant_id,
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
),
ZNodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Timeline(zttid) => (
zttid,
ZNodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?),
),
};
let info_str = kv.value_str().map_err(|e| {
BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as value str"))
})?;
Ok(Some((
zttid,
SafekeeperTimeline {
safekeeper_id,
info: serde_json::from_str(info_str).map_err(|e| {
BrokerError::ParsingError(format!(
"Failed to parse '{info_str}' as safekeeper timeline info: {e}"
))
})?,
},
)))
}
fn parse_capture<T>(caps: &Captures, index: usize) -> Result<T, String>
where
T: FromStr,
<T as FromStr>::Err: Display,
{
let capture_match = caps
.get(index)
.ok_or_else(|| format!("Failed to get capture match at index {index}"))?
.as_str();
capture_match.parse().map_err(|e| {
format!(
"Failed to parse {} from {capture_match}: {e}",
std::any::type_name::<T>()
)
})
}

View File

@@ -224,7 +224,7 @@ impl fmt::Display for ZTenantTimelineId {
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZNodeId(pub u64);

View File

@@ -517,7 +517,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
.collect()
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
@@ -550,17 +550,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
pageserver
.tenant_config(tenant_id, tenant_conf)
.unwrap_or_else(|e| {
anyhow!(
"Tenant config failed for tenant with id {} : {}",
tenant_id,
e
);
});
println!(
"tenant {} successfully configured on the pageserver",
tenant_id
);
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),

View File

@@ -24,11 +24,10 @@ walkdir = "2"
url = "2.2.2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
serde_with = {version = "1.12.0"}
serde_with = "1.12.0"
hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
etcd-client = "0.8.3"
tokio-util = { version = "0.7", features = ["io"] }
rusoto_core = "0.47"
rusoto_s3 = "0.47"
@@ -36,6 +35,7 @@ rusoto_s3 = "0.47"
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
etcd_broker = { path = "../libs/etcd_broker" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -109,6 +109,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"),
)
.arg(
Arg::new("broker-etcd-prefix")
.long("broker-etcd-prefix")
.takes_value(true)
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -118,7 +124,7 @@ fn main() -> Result<()> {
return Ok(());
}
let mut conf: SafeKeeperConf = Default::default();
let mut conf = SafeKeeperConf::default();
if let Some(dir) = arg_matches.value_of("datadir") {
// change into the data directory.
@@ -162,6 +168,9 @@ fn main() -> Result<()> {
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
conf.broker_endpoints = Some(collected_ep?);
}
if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") {
conf.broker_etcd_prefix = prefix.to_string();
}
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}

View File

@@ -1,61 +1,22 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::bail;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_client::Client;
use etcd_client::EventType;
use etcd_client::PutOptions;
use etcd_client::WatchOptions;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::str::FromStr;
use etcd_broker::Client;
use etcd_broker::PutOptions;
use etcd_broker::SkTimelineSubscriptionKind;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::{safekeeper::Term, timeline::GlobalTimelines, SafeKeeperConf};
use utils::{
lsn::Lsn,
zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use utils::zid::{ZNodeId, ZTenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
const LEASE_TTL_SEC: i64 = 5;
// TODO: add global zenith installation ID.
const ZENITH_PREFIX: &str = "zenith";
/// Published data about safekeeper. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Deserialize, Serialize)]
pub struct SafekeeperInfo {
/// Term of the last entry.
pub last_log_term: Option<Term>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub s3_wal_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
}
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
@@ -71,22 +32,21 @@ pub fn thread_main(conf: SafeKeeperConf) {
});
}
/// Prefix to timeline related data.
fn timeline_path(zttid: &ZTenantTimelineId) -> String {
/// Key to per timeline per safekeeper data.
fn timeline_safekeeper_path(
broker_prefix: String,
zttid: ZTenantTimelineId,
sk_id: ZNodeId,
) -> String {
format!(
"{}/{}/{}",
ZENITH_PREFIX, zttid.tenant_id, zttid.timeline_id
"{}/{sk_id}",
SkTimelineSubscriptionKind::timeline(broker_prefix, zttid).watch_key()
)
}
/// Key to per timeline per safekeeper data.
fn timeline_safekeeper_path(zttid: &ZTenantTimelineId, sk_id: ZNodeId) -> String {
format!("{}/safekeeper/{}", timeline_path(zttid), sk_id)
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?;
// Get and maintain lease to automatically delete obsolete data
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
@@ -98,14 +58,17 @@ async fn push_loop(conf: SafeKeeperConf) -> Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
let sk_info = tli.get_public_info();
for zttid in GlobalTimelines::get_active_timelines() {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
let sk_info = tli.get_public_info()?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(
timeline_safekeeper_path(zttid, conf.my_id),
timeline_safekeeper_path(
conf.broker_etcd_prefix.clone(),
zttid,
conf.my_id,
),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
@@ -128,45 +91,31 @@ async fn push_loop(conf: SafeKeeperConf) -> Result<()> {
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
lazy_static! {
static ref TIMELINE_SAFEKEEPER_RE: Regex =
Regex::new(r"^zenith/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$")
.unwrap();
}
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
loop {
let wo = WatchOptions::new().with_prefix();
// TODO: subscribe only to my timelines
let (_, mut stream) = client.watch(ZENITH_PREFIX, Some(wo)).await?;
while let Some(resp) = stream.message().await? {
if resp.canceled() {
bail!("watch canceled");
}
let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?;
for event in resp.events() {
if EventType::Put == event.event_type() {
if let Some(kv) = event.kv() {
if let Some(caps) = TIMELINE_SAFEKEEPER_RE.captures(kv.key_str()?) {
let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let zttid = ZTenantTimelineId::new(tenant_id, timeline_id);
let safekeeper_id = ZNodeId(caps.get(3).unwrap().as_str().parse()?);
let value_str = kv.value_str()?;
match serde_json::from_str::<SafekeeperInfo>(value_str) {
Ok(safekeeper_info) => {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
tli.record_safekeeper_info(&safekeeper_info, safekeeper_id)?
}
}
Err(err) => warn!(
"failed to deserialize safekeeper info {}: {}",
value_str, err
),
}
let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates(
&mut client,
SkTimelineSubscriptionKind::all(conf.broker_etcd_prefix.clone()),
)
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.fetch_data().await {
Some(new_info) => {
for (zttid, sk_info) in new_info {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id)?
}
}
}
}
None => {
debug!("timeline updates sender closed, aborting the pull loop");
return Ok(());
}
}
}
}

View File

@@ -1,3 +1,4 @@
use etcd_broker::SkTimelineInfo;
use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize;
@@ -5,7 +6,6 @@ use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use crate::broker::SafekeeperInfo;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::GlobalTimelines;
@@ -136,7 +136,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let safekeeper_info: SafekeeperInfo = json_request(&mut request).await?;
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
tli.record_safekeeper_info(&safekeeper_info, ZNodeId(1))?;

View File

@@ -27,6 +27,7 @@ pub mod defaults {
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_NEON_BROKER_PREFIX: &str = "neon";
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
@@ -51,6 +52,7 @@ pub struct SafeKeeperConf {
pub recall_period: Duration,
pub my_id: ZNodeId,
pub broker_endpoints: Option<Vec<Url>>,
pub broker_etcd_prefix: String,
}
impl SafeKeeperConf {
@@ -76,6 +78,7 @@ impl Default for SafeKeeperConf {
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
broker_endpoints: None,
broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(),
}
}
}

View File

@@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use etcd_broker::SkTimelineInfo;
use postgres_ffi::xlog_utils::TimeLineID;
use postgres_ffi::xlog_utils::XLogSegNo;
@@ -16,7 +17,6 @@ use tracing::*;
use lazy_static::lazy_static;
use crate::broker::SafekeeperInfo;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
@@ -886,7 +886,7 @@ where
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperInfo) -> Result<()> {
pub fn record_safekeeper_info(&mut self, sk_info: &SkTimelineInfo) -> Result<()> {
let mut sync_control_file = false;
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
{

View File

@@ -3,6 +3,7 @@
use anyhow::{bail, Context, Result};
use etcd_broker::SkTimelineInfo;
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
@@ -21,7 +22,6 @@ use utils::{
zid::{ZNodeId, ZTenantTimelineId},
};
use crate::broker::SafekeeperInfo;
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file;
@@ -89,6 +89,7 @@ struct SharedState {
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
listen_pg_addr: String,
last_removed_segno: XLogSegNo,
}
@@ -111,6 +112,7 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -130,6 +132,7 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
listen_pg_addr: conf.listen_pg_addr.clone(),
last_removed_segno: 0,
})
}
@@ -418,9 +421,9 @@ impl Timeline {
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self) -> SafekeeperInfo {
pub fn get_public_info(&self) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
SafekeeperInfo {
Ok(SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
@@ -432,11 +435,23 @@ impl Timeline {
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
}
wal_stream_connection_string: shared_state
.pageserver_connstr
.as_deref()
.map(|pageserver_connstr| {
wal_stream_connection_string(
self.zttid,
&shared_state.listen_pg_addr,
pageserver_connstr,
)
})
.transpose()
.context("Failed to get the pageserver callmemaybe connstr")?,
})
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> {
pub fn record_safekeeper_info(&self, sk_info: &SkTimelineInfo, _sk_id: ZNodeId) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
@@ -489,6 +504,29 @@ impl Timeline {
}
}
// pageserver connstr is needed to be able to distinguish between different pageservers
// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved
// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105
fn wal_stream_connection_string(
ZTenantTimelineId {
tenant_id,
timeline_id,
}: ZTenantTimelineId,
listen_pg_addr_str: &str,
pageserver_connstr: &str,
) -> anyhow::Result<String> {
let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str);
let me_conf = me_connstr
.parse::<postgres::config::Config>()
.with_context(|| {
format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one")
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
))
}
// Utilities needed by various Connection-like objects
pub trait TimelineTools {
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;

View File

@@ -14,29 +14,34 @@ publish = false
### BEGIN HAKARI SECTION
[dependencies]
ahash = { version = "0.7", features = ["std"] }
anyhow = { version = "1", features = ["backtrace", "std"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itoa = { version = "0.4", features = ["i128", "std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std", "use_std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "std"] }
prost = { version = "0.9", features = ["prost-derive", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }
tokio-util = { version = "0.7", features = ["codec", "io"] }
tracing = { version = "0.1", features = ["attributes", "log", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
anyhow = { version = "1", features = ["backtrace", "std"] }
bytes = { version = "1", features = ["serde", "std"] }
clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] }
@@ -46,7 +51,7 @@ indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std", "use_std"] }
prost = { version = "0.9", features = ["prost-derive", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }