Compare commits

...

2 Commits

Author SHA1 Message Date
Arseny Sher
a40e1edbf4 Tonic based broker for SkTimelineInfo. 2022-10-19 17:44:51 +04:00
Arseny Sher
40a56a302a Determine safekeeper for offloading WAL without etcd election API.
This API is rather pointless, as sane choice anyway requires knowledge of peers
status and leaders lifetime in any case can intersect, which is fine for us --
so manual elections are straightforward. Here, we deterministically choose among
the reasonably caught up safekeepers, shifting by timeline id to spread the
load.

A step towards custom broker https://github.com/neondatabase/neon/issues/2394
2022-10-16 15:51:21 +04:00
19 changed files with 2465 additions and 345 deletions

207
Cargo.lock generated
View File

@@ -452,13 +452,28 @@ checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"once_cell",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_derive"
version = "3.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
@@ -789,9 +804,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f39818dcfc97d45b03953c1292efc4e80954e1583c4aa770bac1383e2310a4"
checksum = "3f83d0ebf42c6eafb8d7c52f7e5f2d3003b89c7aa4fd2b79229209459a849af8"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -801,9 +816,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e580d70777c116df50c390d1211993f62d40302881e54d4b79727acb83d0199"
checksum = "07d050484b55975889284352b0ffc2ecbda25c0c55978017c132b29ba0818a86"
dependencies = [
"cc",
"codespan-reporting",
@@ -816,15 +831,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56a46460b88d1cec95112c8c363f0e2c39afdb237f60583b0b36343bf627ea9c"
checksum = "99d2199b00553eda8012dfec8d3b1c75fce747cf27c169a270b3b99e3448ab78"
[[package]]
name = "cxxbridge-macro"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "747b608fecf06b0d72d440f27acc99288207324b793be2c17991839f3d4995ea"
checksum = "dcb67a6de1f602736dd7eaead0080cf3435df806c61b24b13328db128c58868f"
dependencies = [
"proc-macro2",
"quote",
@@ -1002,11 +1017,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf"
dependencies = [
"http",
"prost",
"prost 0.10.4",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tonic 0.7.2",
"tonic-build 0.7.2",
"tower",
"tower-service",
]
@@ -1516,9 +1531,9 @@ dependencies = [
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde6edd6cef363e9359ed3c98ba64590ba9eecba2293eb5a723ab32aee8926aa"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
dependencies = [
"cxx",
"cxx-build",
@@ -1644,9 +1659,9 @@ dependencies = [
[[package]]
name = "kqueue"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98"
dependencies = [
"kqueue-sys",
"libc",
@@ -1872,6 +1887,22 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae"
[[package]]
name = "neon_broker"
version = "0.1.0"
dependencies = [
"async-stream",
"clap",
"futures-core",
"futures-util",
"prost 0.11.0",
"tokio",
"tokio-stream",
"tonic 0.8.2",
"tonic-build 0.8.2",
"utils",
]
[[package]]
name = "nix"
version = "0.23.1"
@@ -2416,14 +2447,38 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "prettyplease"
version = "0.1.20"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fead41e178796ef8274dc612a7d8ce4c7e10ca35cd2c5b5ad24cac63aeb6c0"
checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -2432,9 +2487,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.46"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
dependencies = [
"unicode-ident",
]
@@ -2475,7 +2530,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.10.1",
]
[[package]]
name = "prost"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7"
dependencies = [
"bytes",
"prost-derive 0.11.0",
]
[[package]]
@@ -2493,8 +2558,28 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"prost 0.10.4",
"prost-types 0.10.1",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-build"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost 0.11.0",
"prost-types 0.11.1",
"regex",
"tempfile",
"which",
@@ -2513,6 +2598,19 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.10.1"
@@ -2520,7 +2618,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
"prost 0.10.4",
]
[[package]]
name = "prost-types"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e"
dependencies = [
"bytes",
"prost 0.11.0",
]
[[package]]
@@ -3755,8 +3863,40 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.10.4",
"prost-derive 0.10.1",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.11.0",
"prost-derive 0.11.0",
"tokio",
"tokio-stream",
"tokio-util",
@@ -3775,7 +3915,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-build 0.10.4",
"quote",
"syn",
]
[[package]]
name = "tonic-build"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build 0.11.1",
"quote",
"syn",
]
@@ -4311,7 +4464,7 @@ dependencies = [
"num-bigint",
"num-integer",
"num-traits",
"prost",
"prost 0.10.4",
"rand",
"regex",
"regex-syntax",

View File

@@ -11,6 +11,7 @@ cargo-features = ["named-profiles"]
[workspace]
members = [
"broker",
"compute_tools",
"control_plane",
"pageserver",

1120
broker/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

35
broker/Cargo.toml Normal file
View File

@@ -0,0 +1,35 @@
[package]
name = "neon_broker"
version = "0.1.0"
edition = "2021"
[features]
bench = []
[[bin]]
name = "neon_broker"
path = "src/broker.rs"
[[bin]]
name = "neon_broker_bench"
path = "src/bench.rs"
# build benchmarking binary only if explicitly requested with '--feature bench'
# required-features = ["bench"]
[dependencies]
async-stream = "0.3"
futures-core = "0.3"
futures-util = "0.3"
tonic = "0.8"
prost = "0.11"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
# for exploring with tokio-console
# tokio = { version = "1", features = ["full", "tracing"] }
# console-subscriber = "0.1.8"
tokio-stream = "0.1"
clap = { version = "3.2.17", features = ["derive"] }
utils = { path = "../libs/utils" }
[build-dependencies]
tonic-build = "0.8"

4
broker/build.rs Normal file
View File

@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/broker.proto")?;
Ok(())
}

38
broker/proto/broker.proto Normal file
View File

@@ -0,0 +1,38 @@
syntax = "proto3";
package neon_broker;
service NeonBroker {
// Subscribe to safekeeper updates.
rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {};
// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (Empty) {};
}
message SubscribeSafekeeperInfoRequest {
oneof subscription_key {
Empty all = 1; // subscribe to everything
TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline
}
}
message SafekeeperTimelineInfo {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
uint64 last_log_term = 3;
uint64 flush_lsn = 4;
uint64 commit_lsn = 5;
uint64 backup_lsn = 6;
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
string safekeeper_connstr = 9;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}
message Empty {
}

4
broker/readme.md Normal file
View File

@@ -0,0 +1,4 @@
```
cargo build -r -p neon_broker --features bench && target/release/neon_broker
target/release/neon_broker_bench -s 1 -p 1
```

179
broker/src/bench.rs Normal file
View File

@@ -0,0 +1,179 @@
pub mod neon_broker {
tonic::include_proto!("neon_broker");
}
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use clap::Parser;
use neon_broker::neon_broker_client::NeonBrokerClient;
use neon_broker::subscribe_safekeeper_info_request::SubscriptionKey;
use neon_broker::TenantTimelineId as ProtoTenantTimelineId;
use neon_broker::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use tokio::time::{self, sleep};
use tonic::transport::Channel;
use tonic::Request;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Number of publishers
#[clap(short = 'p', long, value_parser, default_value_t = 1)]
num_pubs: u64,
/// Number of subscribers
#[clap(short = 's', long, value_parser, default_value_t = 1)]
num_subs: u64,
}
async fn progress_reporter(counters: Vec<Arc<AtomicU64>>) {
let mut interval = time::interval(Duration::from_millis(1000));
let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let mut c_min_old = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
let mut started_at = None;
let mut skipped: u64 = 0;
loop {
interval.tick().await;
// print!(
// "cnts are {:?}",
// counters
// .iter()
// .map(|c| c.load(Ordering::Relaxed))
// .collect::<Vec<_>>()
// );
let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let c_min_new = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
if c_new > 0 && started_at.is_none() {
started_at = Some(Instant::now());
skipped = c_new;
}
let avg_rps = started_at.map(|s| {
let dur = s.elapsed();
let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0;
let avg_rps = (c_new - skipped) as f64 / dur_secs;
(dur, avg_rps)
});
println!(
"sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}",
c_new - c_old,
c_min_new - c_min_old,
c_new,
c_min_new,
avg_rps
);
c_old = c_new;
c_min_old = c_min_new;
}
}
fn tli_from_u64(i: u64) -> Vec<u8> {
let mut timeline_id = vec![0xFF; 8];
timeline_id.extend_from_slice(&i.to_be_bytes());
timeline_id
}
async fn subscribe(client: Option<NeonBrokerClient<Channel>>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap(),
};
// let key = SubscriptionKey::All(Empty {});
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
let mut stream = client
.subscribe_safekeeper_info(request)
.await
.unwrap()
.into_inner();
while let Some(_feature) = stream.message().await.unwrap() {
counter.fetch_add(1, Ordering::Relaxed);
// println!("info = {:?}, client {}", _feature, i);
}
}
async fn publish(client: Option<NeonBrokerClient<Channel>>, n_keys: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap(),
};
let mut counter: u64 = 0;
// create stream producing new values
let outbound = async_stream::stream! {
loop {
let info = SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(counter % n_keys),
}),
last_log_term: 0,
flush_lsn: counter,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
};
counter += 1;
// println!("sending info = {:?}", info);
// if counter >= 1000 {
// break;
// }
yield info;
// sleep(Duration::from_millis(100)).await;
}
};
let _response = client
.publish_safekeeper_info(Request::new(outbound))
.await
.unwrap();
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut counters = Vec::with_capacity(args.num_subs as usize);
for _ in 0..args.num_subs {
counters.push(Arc::new(AtomicU64::new(0)));
}
let h = tokio::spawn(progress_reporter(counters.clone()));
let c = NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap();
for i in 0..args.num_subs {
let c = Some(c.clone());
// let c = None;
tokio::spawn(subscribe(c, counters[i as usize].clone(), i));
}
for _i in 0..args.num_pubs {
// let c = Some(c.clone());
tokio::spawn(publish(None, args.num_subs as u64));
}
h.await?;
Ok(())
}

526
broker/src/broker.rs Normal file
View File

@@ -0,0 +1,526 @@
//! Simple pub-sub based on grpc (tonic) and Tokio mpsc for storage nodes
//! messaging. The main design goal is to avoid central synchronization during
//! normal flow, resorting to it only when pub/sub change happens. Each
//! subscriber holds mpsc for messages it sits on; tx end is sent to existing
//! publishers and saved in shared state for new ones. Publishers maintain
//! locally set of subscribers they stream messages to.
//!
//! Subscriptions to 1) single timeline 2) everything are possible. We could add
//! subscription to set of timelines to save grpc streams, but testing shows
//! many individual streams is also ok.
//!
//! Message is dropped if subscriber can't consume it, not affecting other
//! subscribers.
//!
//! Only safekeeper message is supported, but it is not hard to add something
//! else with templating.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::{select, time};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Code;
use tonic::{transport::Server, Request, Response, Status};
use neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer};
use neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId;
use neon_broker_proto::{Empty, SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
pub mod neon_broker_proto {
// The string specified here must match the proto package name.
// If you want to have a look at the generated code, it is at path similar to
// target/debug/build/neon_broker-0fde81d03bedc3b2/out/neon_broker.rs
tonic::include_proto!("neon_broker");
}
// Max size of the queue to the subscriber.
const CHAN_SIZE: usize = 256;
type PubId = u64; // id of publisher for registering in maps
type SubId = u64; // id of subscriber for registering in maps
#[derive(Copy, Clone)]
enum SubscriptionKey {
All,
Timeline(TenantTimelineId),
}
impl SubscriptionKey {
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
}
}
}
}
// Subscriber id + tx end of the channel for messages to it.
#[derive(Clone)]
struct SubSender(SubId, Sender<SafekeeperTimelineInfo>);
impl fmt::Debug for SubSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Subscription id {}", self.0)
}
}
// Announcements subscriber sends to publisher(s) asking it to stream to the
// provided channel, or forget about it, releasing memory.
#[derive(Clone)]
enum SubAnnounce {
AddAll(Sender<SafekeeperTimelineInfo>), // add subscription to all timelines
AddTimeline(TenantTimelineId, SubSender), // add subsciption to the specific timeline
RemoveTimeline(TenantTimelineId, SubId), // remove subscription to the specific timeline
// RemoveAll is not needed as publisher will notice closed channel while
// trying to send the next message.
}
struct SharedState {
// Registered publishers. They sit on the rx end of these channels and
// receive through it tx handles of chans to subscribers.
//
// Note: publishers don't identify which keys they publish, so each
// publisher will receive channels to all subs and filter them before sending.
pub_txs: HashMap<PubId, Sender<SubAnnounce>>,
next_pub_id: PubId,
// Registered subscribers -- when publisher joins it walks over them,
// collecting txs to send messages.
subs_to_all: HashMap<SubId, Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
next_sub_id: SubId,
}
// Utility func to remove subscription from the map
fn remove_sub(
subs_to_timelines: &mut HashMap<TenantTimelineId, Vec<SubSender>>,
ttid: &TenantTimelineId,
sub_id: SubId,
) {
if let Some(subsenders) = subs_to_timelines.get_mut(&ttid) {
subsenders.retain(|ss| ss.0 != sub_id);
if subsenders.len() == 0 {
subs_to_timelines.remove(&ttid);
}
}
// Note that subscription might be not here if subscriber task was aborted
// earlier than it managed to notify publisher about itself.
}
impl SharedState {
// Register new publisher.
pub fn register_publisher(&mut self, announce_tx: Sender<SubAnnounce>) -> PubId {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
assert!(!self.pub_txs.contains_key(&pub_id));
self.pub_txs.insert(pub_id, announce_tx);
pub_id
}
pub fn unregister_publisher(&mut self, pub_id: PubId) {
assert!(self.pub_txs.contains_key(&pub_id));
self.pub_txs.remove(&pub_id);
}
// Register new subscriber.
// Returns list of channels through which existing publishers must be notified
// about new subscriber; we can't do it here due to risk of deadlock.
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
sub_tx: Sender<SafekeeperTimelineInfo>,
) -> (SubId, Vec<Sender<SubAnnounce>>, SubAnnounce) {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let announce = match sub_key {
SubscriptionKey::All => {
assert!(!self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.insert(sub_id, sub_tx.clone());
SubAnnounce::AddAll(sub_tx)
}
SubscriptionKey::Timeline(ttid) => {
match self.subs_to_timelines.entry(ttid) {
Entry::Occupied(mut o) => {
let subsenders = o.get_mut();
subsenders.push(SubSender(sub_id, sub_tx.clone()));
}
Entry::Vacant(v) => {
v.insert(vec![SubSender(sub_id, sub_tx.clone())]);
}
}
SubAnnounce::AddTimeline(ttid, SubSender(sub_id, sub_tx))
}
};
// Collect existing publishers to notify them after lock is released;
// TODO: the probability of channels being full here is tiny (publisher
// always blocks listening chan), we can try sending first and resort to
// cloning if needed.
//
// Deadlock is possible only if publisher tries to access shared state
// during its lifetime, i.e. we add maintenance of set of published
// tlis. Otherwise we can just await here (but lock must be replaced
// with Tokio one).
//
// We could also just error out if some chan is full, but that needs
// cleanup of incompleted job, and notifying publishers when unregistering
// is mandatory anyway.
(sub_id, self.pub_txs.values().cloned().collect(), announce)
}
// Unregister the subscriber. Similar to register_subscriber, returns list
// of channels through which publishers must be notified about the removal.
pub fn unregister_subscriber(
&mut self,
sub_id: SubId,
sub_key: SubscriptionKey,
) -> Option<(Vec<Sender<SubAnnounce>>, SubAnnounce)> {
// We need to notify existing publishers only about per timeline
// subscriptions, 'all' kind is detected on its own through closed
// channels.
let announce = match sub_key {
SubscriptionKey::All => {
assert!(self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.remove(&sub_id);
None
}
SubscriptionKey::Timeline(ref ttid) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
Some(SubAnnounce::RemoveTimeline(*ttid, sub_id))
}
};
announce.map(|a| (self.pub_txs.values().cloned().collect(), a))
}
pub fn report(&mut self) {
println!(
"registered {} publishers, {} subs to all, {} subs to timelines",
self.pub_txs.len(),
self.subs_to_all.len(),
self.subs_to_timelines.len(),
);
}
}
// SharedState wrapper for post-locking operations (sending to pub_tx chans).
#[derive(Clone)]
struct Registry {
shared_state: Arc<Mutex<SharedState>>,
}
impl Registry {
// Register new publisher in shared state.
pub fn register_publisher(&self) -> Publisher {
let (announce_tx, announce_rx) = mpsc::channel(128);
let mut ss = self.shared_state.lock().unwrap();
let id = ss.register_publisher(announce_tx);
let (subs_to_all, subs_to_timelines) = (
ss.subs_to_all.values().cloned().collect(),
ss.subs_to_timelines.clone(),
);
drop(ss);
// println!("registered publisher {}", id);
Publisher {
id,
announce_rx: announce_rx.into(),
subs_to_all,
subs_to_timelines,
registry: self.clone(),
}
}
pub fn unregister_publisher(&self, publisher: &Publisher) {
self.shared_state
.lock()
.unwrap()
.unregister_publisher(publisher.id);
// println!("unregistered publisher {}", publisher.id);
}
// Register new subscriber in shared state.
pub async fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber {
let (tx, rx) = mpsc::channel(CHAN_SIZE);
let id;
let mut pub_txs;
let announce;
{
let mut ss = self.shared_state.lock().unwrap();
(id, pub_txs, announce) = ss.register_subscriber(sub_key, tx);
}
// Note: it is important to create Subscriber before .await. If client
// disconnects during await, which would terminate the Future we still
// need to run Subscriber's drop() which will unregister it from the
// shared state.
let subscriber = Subscriber {
id,
key: sub_key,
sub_rx: rx,
registry: self.clone(),
};
// Notify existing publishers about new subscriber.
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
// println!("registered subscriber {}", id);
subscriber
}
// Unregister the subscriber
pub fn unregister_subscriber(&self, sub: &Subscriber) {
let mut ss = self.shared_state.lock().unwrap();
let announce_pack = ss.unregister_subscriber(sub.id, sub.key);
drop(ss);
// Notify publishers about the removal. Apart from wanting to do it
// outside lock, here we also spin a task as Drop impl can't be async.
if let Some((mut pub_txs, announce)) = announce_pack {
tokio::spawn(async move {
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
});
}
// println!("unregistered subscriber {}", sub.id);
}
pub async fn report(&self) {
let mut interval = time::interval(Duration::from_millis(1000));
loop {
interval.tick().await;
self.shared_state.lock().unwrap().report();
}
}
}
// Private subscriber state.
struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: Receiver<SafekeeperTimelineInfo>,
// to unregister itself from shared state in Drop
registry: Registry,
}
impl Drop for Subscriber {
fn drop(&mut self) {
self.registry.unregister_subscriber(self);
}
}
// Private publisher state
struct Publisher {
id: PubId,
// new subscribers request to send (or stop sending) msgs them here.
// It could be just Receiver, but weirdly it doesn't implement futures_core Stream directly.
announce_rx: ReceiverStream<SubAnnounce>,
subs_to_all: Vec<Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
// to unregister itself from shared state in Drop
registry: Registry,
}
impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let mut cleanup_subs_to_all = false;
for sub in self.subs_to_all.iter() {
match sub.try_send(msg.clone()) {
Err(TrySendError::Full(_)) => {
// println!("dropping message, channel is full");
}
Err(TrySendError::Closed(_)) => {
cleanup_subs_to_all = true;
}
_ => (),
}
}
// some channels got closed (subscriber gone), remove them
if cleanup_subs_to_all {
self.subs_to_all.retain(|tx| !tx.is_closed());
}
// send message to per timeline subscribers
let ttid = parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or(Status::new(
Code::InvalidArgument,
"missing tenant_timeline_id",
))?)?;
if let Some(subs) = self.subs_to_timelines.get(&ttid) {
for tx in subs.iter().map(|sub_sender| &sub_sender.1) {
if let Err(TrySendError::Full(_)) = tx.try_send(msg.clone()) {
// println!("dropping message, channel is full");
}
// closed channel is ignored here; we will be notified and remove it soon
}
}
Ok(())
}
// Add/remove subscriber according to sub_announce.
pub fn update_sub(&mut self, sub_announce: SubAnnounce) {
match sub_announce {
SubAnnounce::AddAll(tx) => self.subs_to_all.push(tx),
SubAnnounce::AddTimeline(ttid, sub_sender) => {
match self.subs_to_timelines.entry(ttid) {
Entry::Occupied(mut o) => {
let subsenders = o.get_mut();
subsenders.push(sub_sender);
}
Entry::Vacant(v) => {
v.insert(vec![sub_sender]);
}
}
}
SubAnnounce::RemoveTimeline(ref ttid, sub_id) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
}
}
}
}
impl Drop for Publisher {
fn drop(&mut self) {
self.registry.unregister_publisher(self);
}
}
struct NeonBrokerService {
registry: Registry,
}
#[tonic::async_trait]
impl NeonBroker for NeonBrokerService {
async fn publish_safekeeper_info(
&self,
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
) -> Result<Response<Empty>, Status> {
let mut publisher = self.registry.register_publisher();
let mut stream = request.into_inner();
loop {
select! {
msg = stream.next() => {
match msg {
Some(Ok(msg)) => {publisher.send_msg(&msg)?;},
Some(Err(e)) => {return Err(e);}, // grpc error from the stream
None => {break;} // closed stream
}
}
Some(announce) = publisher.announce_rx.next() => {
publisher.update_sub(announce);
}
}
}
Ok(Response::new(Empty {}))
}
type SubscribeSafekeeperInfoStream =
Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
async fn subscribe_safekeeper_info(
&self,
request: Request<SubscribeSafekeeperInfoRequest>,
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
let proto_key = request.into_inner().subscription_key.ok_or(Status::new(
Code::InvalidArgument,
"missing subscription key",
))?;
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
let mut subscriber = self.registry.register_subscriber(sub_key).await;
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
while let Some(info) = subscriber.sub_rx.recv().await {
yield info
}
// internal generator
// let _ = subscriber.sub_rx.try_recv().ok();
// let mut counter = 0;
// loop {
// let info = SafekeeperTimelineInfo {
// safekeeper_id: 1,
// tenant_timeline_id: Some(ProtoTenantTimelineId {
// tenant_id: vec![0xFF; 16],
// timeline_id: vec![0xFF; 16],
// // timeline_id: tli_from_u64(counter),
// }),
// last_log_term: 0,
// flush_lsn: counter,
// commit_lsn: 2,
// backup_lsn: 3,
// remote_consistent_lsn: 4,
// peer_horizon_lsn: 5,
// safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
// };
// counter += 1;
// yield info;
// }
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// console_subscriber::init();
let addr = "[::1]:50051".parse()?;
let registry = Registry {
shared_state: Arc::new(Mutex::new(SharedState {
pub_txs: HashMap::new(),
next_pub_id: 0,
subs_to_all: HashMap::new(),
subs_to_timelines: HashMap::new(),
next_sub_id: 0,
})),
};
let neon_broker_service = NeonBrokerService {
registry: registry.clone(),
};
tokio::spawn(async move { registry.report().await });
Server::builder()
.http2_keepalive_interval(Some(Duration::from_millis(5000)))
.add_service(NeonBrokerServer::new(neon_broker_service))
.serve(addr)
.await?;
Ok(())
}
// parse variable length bytes from protobuf
fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
let tenant_id = TenantId::from_vec(&proto_ttid.tenant_id)
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
let timeline_id = TimelineId::from_vec(&proto_ttid.timeline_id).map_err(|e| {
Status::new(
Code::InvalidArgument,
format!("malformed timeline_id: {}", e),
)
})?;
Ok(TenantTimelineId {
tenant_id,
timeline_id,
})
}

View File

@@ -29,6 +29,9 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub local_start_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,

View File

@@ -3,6 +3,13 @@ use std::{fmt, str::FromStr};
use hex::FromHex;
use rand::Rng;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IdError {
#[error("invalid id length {0}")]
VecParseError(usize),
}
/// Neon ID is a 128-bit random ID.
/// Used to represent various identifiers. Provides handy utility methods and impls.
@@ -22,6 +29,15 @@ impl Id {
Id::from(arr)
}
pub fn from_vec(src: &Vec<u8>) -> Result<Id, IdError> {
if src.len() != 16 {
return Err(IdError::VecParseError(src.len()));
}
let mut zid_slice = [0u8; 16];
zid_slice.copy_from_slice(&src);
Ok(zid_slice.into())
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
@@ -75,6 +91,12 @@ impl From<[u8; 16]> for Id {
}
}
impl From<Id> for u128 {
fn from(id: Id) -> Self {
u128::from_le_bytes(id.0)
}
}
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.hex_encode())
@@ -94,6 +116,10 @@ macro_rules! id_newtype {
$t(Id::get_from_buf(buf))
}
pub fn from_vec(src: &Vec<u8>) -> Result<$t, IdError> {
Ok($t(Id::from_vec(src)?))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}
@@ -136,6 +162,12 @@ macro_rules! id_newtype {
}
}
impl From<$t> for u128 {
fn from(id: $t) -> Self {
u128::from(id.0)
}
}
impl fmt::Display for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)

View File

@@ -802,6 +802,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -818,6 +819,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -834,6 +837,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -850,6 +854,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -909,6 +914,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -925,6 +932,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -941,6 +950,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -975,6 +986,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1007,6 +1020,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
},
etcd_version: 0,
@@ -1023,6 +1038,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1039,6 +1056,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -1084,6 +1103,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1100,6 +1121,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1169,6 +1192,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1185,6 +1210,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
},
etcd_version: 0,
@@ -1256,6 +1283,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1327,6 +1356,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,

View File

@@ -21,7 +21,8 @@ use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG,
DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
@@ -79,12 +80,6 @@ fn main() -> anyhow::Result<()> {
.long("pageserver")
.takes_value(true),
)
.arg(
Arg::new("recall")
.long("recall")
.takes_value(true)
.help("Period for requestion pageserver to call for replication"),
)
.arg(
Arg::new("daemonize")
.short('d')
@@ -119,6 +114,12 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("heartbeat-timeout")
.long("heartbeat-timeout")
.takes_value(true)
.help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs()))
)
.arg(
Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
@@ -127,6 +128,12 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("max-offloader-lag")
.long("max-offloader-lag")
.takes_value(true)
.help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG / (1 << 20)))
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
@@ -173,10 +180,6 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_owned();
}
if let Some(recall) = arg_matches.value_of("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(NodeId(
@@ -194,6 +197,16 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(heartbeat_timeout_str) = arg_matches.value_of("heartbeat-timeout") {
conf.heartbeat_timeout =
humantime::parse_duration(heartbeat_timeout_str).with_context(|| {
format!(
"failed to parse heartbeat-timeout {}",
heartbeat_timeout_str
)
})?;
}
if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
@@ -206,6 +219,14 @@ fn main() -> anyhow::Result<()> {
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
if let Some(max_offloader_lag_str) = arg_matches.value_of("max-offloader-lag") {
conf.max_offloader_lag = max_offloader_lag_str.parse().with_context(|| {
format!(
"failed to parse max offloader lag {}",
max_offloader_lag_str
)
})?;
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.wal_backup_enabled = arg_matches
.value_of("enable-wal-backup")

View File

@@ -1,6 +1,5 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -12,11 +11,9 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -56,113 +53,6 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp
.kv()
.ok_or_else(|| anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
// kill previous keepalive, if any
if let Some(l) = leader.take() {
l.give_up().await;
}
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// immediately save handle to kill task if we get canceled below
*leader = Some(ElectionLeader {
client: client.clone(),
keep_alive,
});
client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await?;
Ok(())
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
ttid: TenantTimelineId,
mut client: Client,
@@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let handles = active_tlis
.iter()
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let sk_info = tli.get_safekeer_info(&conf);
let key =
timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id);
let lease = leases.remove(&tli.ttid).unwrap();
@@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}

View File

@@ -1,6 +1,7 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
@@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
@@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to hexing some ids
} else if version == 2 {
@@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
@@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to having timeline_start_lsn
} else if version == 4 {
@@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);

View File

@@ -1,4 +1,6 @@
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
use defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
@@ -36,6 +38,8 @@ pub mod defaults {
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_MAX_OFFLOADER_LAG: u64 = 128 * (1 << 20);
}
#[derive(Debug, Clone)]
@@ -60,6 +64,8 @@ pub struct SafeKeeperConf {
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub auth_validation_public_key_path: Option<PathBuf>,
pub heartbeat_timeout: Duration,
pub max_offloader_lag: u64,
}
impl SafeKeeperConf {
@@ -92,6 +98,8 @@ impl Default for SafeKeeperConf {
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,
heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT,
max_offloader_lag: DEFAULT_MAX_OFFLOADER_LAG,
}
}
}

View File

@@ -11,6 +11,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use crate::control_file;
@@ -132,9 +133,8 @@ pub struct ServerInfo {
pub wal_seg_size: u32,
}
/// Data published by safekeeper to the peers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub struct PersistedPeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
backup_lsn: Lsn,
/// Term of the last entry.
@@ -145,7 +145,7 @@ pub struct PeerInfo {
commit_lsn: Lsn,
}
impl PeerInfo {
impl PersistedPeerInfo {
fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
@@ -156,10 +156,8 @@ impl PeerInfo {
}
}
// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peers(pub Vec<(NodeId, PeerInfo)>);
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
@@ -203,7 +201,7 @@ pub struct SafeKeeperState {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
#[derive(Debug, Clone)]
@@ -240,7 +238,12 @@ impl SafeKeeperState {
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
}
}

View File

@@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::XLogSegNo;
use tokio::sync::watch;
use tokio::{sync::watch, time::Instant};
use std::cmp::{max, min};
@@ -26,7 +26,7 @@ use utils::{
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo,
SafekeeperMemState, ServerInfo, Term,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -36,6 +36,53 @@ use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
_flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
pub local_start_lsn: Lsn,
/// When info was received.
ts: Instant,
}
impl PeerInfo {
fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id,
_last_log_term: sk_info.last_log_term.unwrap_or(0),
_flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID),
commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID),
local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID),
ts,
}
}
}
// vector-based node id -> peer state map with very limited functionality we
// need.
#[derive(Debug, Clone, Default)]
pub struct PeersInfo(pub Vec<PeerInfo>);
impl PeersInfo {
fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
self.0.iter_mut().find(|p| p.sk_id == id)
}
fn upsert(&mut self, p: &PeerInfo) {
match self.get(p.sk_id) {
Some(rp) => *rp = p.clone(),
None => self.0.push(p.clone()),
}
}
}
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
@@ -74,6 +121,8 @@ impl ReplicaState {
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// State of peers as we know it.
peers_info: PeersInfo,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
@@ -123,7 +172,8 @@ impl SharedState {
Ok(Self {
sk,
replicas: Vec::new(),
peers_info: PeersInfo(vec![]),
replicas: vec![],
wal_backup_active: false,
active: false,
num_computes: 0,
@@ -142,6 +192,7 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
replicas: Vec::new(),
wal_backup_active: false,
active: false,
@@ -268,6 +319,24 @@ impl SharedState {
self.replicas.push(Some(state));
pos
}
fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(self.sk.get_epoch()),
flush_lsn: Some(self.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(self.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
self.get_replicas_state().remote_consistent_lsn,
self.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(self.sk.inmem.backup_lsn),
local_start_lsn: Some(self.sk.state.local_start_lsn),
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -632,36 +701,25 @@ impl Timeline {
Ok(())
}
/// Return public safekeeper info for broadcasting to broker and other peers.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
/// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeer_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.write_shared_state();
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}
shared_state.get_safekeeper_info(conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
_sk_id: NodeId,
sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
@@ -673,6 +731,22 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state();
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
}
/// Add send_wal replica to the in-memory vector of replicas.
pub fn add_replica(&self, state: ReplicaState) -> usize {
self.write_shared_state().add_replica(state)

View File

@@ -1,8 +1,7 @@
use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::task::JoinHandle;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
@@ -26,14 +25,11 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::Timeline;
use crate::{broker, GlobalTimelines, SafeKeeperConf};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
@@ -70,47 +66,100 @@ struct WalBackupTimelineEntry {
handle: Option<WalBackupTaskHandle>,
}
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
/// - frequently changing the offloader would be bad;
/// - electing seriously lagging safekeeper is undesirable;
/// So we deterministically choose among the reasonably caught up candidates.
fn determine_offloader(
alive_peers: &[PeerInfo],
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match alive_peers.iter().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag)
.unwrap_or(Lsn(0));
let mut caughtup_peers = capable_peers
.clone()
.filter(|p| p.commit_lsn >= threshold)
.collect::<Vec<_>>();
caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
// To distribute the load, shift by timeline_id.
let offloader = caughtup_peers
[(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
.sk_id;
(
Some(offloader),
format!(
"elected {} among {:?} peers, with {} of them being caughtup",
offloader,
capable_peers
.map(|p| (p.sk_id, p.commit_lsn))
.collect::<Vec<_>>(),
caughtup_peers.len()
),
)
}
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
task: &mut WalBackupTimelineEntry,
entry: &mut WalBackupTimelineEntry,
) {
if !task.timeline.can_wal_backup() {
return;
let alive_peers = entry.timeline.get_peers(conf);
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup {}: {}", ttid, election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup {}: {}", ttid, election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
info!("starting WAL backup task for {}", ttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
ttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
@@ -158,27 +207,20 @@ async fn wal_backup_launcher_main_loop(
timeline,
handle: None,
});
consider_start_task(&conf, ttid, entry);
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task for {}", ttid);
let entry = tasks.remove(&ttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}
// Start known tasks, if needed and possible.
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *ttid, entry);
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry).await;
}
}
}
@@ -190,17 +232,13 @@ struct WalBackupTask {
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline. Called only after we checked that backup
/// is required (wal_backup_attend) and possible (can_wal_backup).
/// Offload single timeline.
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let res = GlobalTimelines::get(ttid);
@@ -215,8 +253,6 @@ async fn backup_task_main(
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
@@ -229,9 +265,6 @@ async fn backup_task_main(
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
@@ -239,107 +272,68 @@ impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
// election loop
let mut retry_attempt = 0u32;
// offload loop
loop {
let mut retry_attempt = 0u32;
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
info!("acquiring leadership");
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
info!("acquired leadership");
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach would reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("lost leadership");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
}
}