Compare commits

...

42 Commits

Author SHA1 Message Date
Christian Schwarz
a80790e0c4 WIP 2025-06-12 06:41:07 -07:00
Christian Schwarz
906a963351 WIP advertisement sending 2025-06-08 20:10:30 -07:00
Christian Schwarz
9e7556bef2 WIP 2025-06-06 19:03:23 -07:00
Christian Schwarz
4f4214eea3 WIP integrate 2025-06-06 18:36:16 -07:00
Christian Schwarz
3dffbda428 rip out the old wal_advertiser::advmap impl, stubbing out with todo!()s 2025-06-06 17:40:59 -07:00
Christian Schwarz
04fb256e0f leave some TODOs on the lib 2025-06-06 16:51:04 -07:00
Christian Schwarz
02fe35831c fix compile warninign in benchmark 2025-06-06 16:50:14 -07:00
Christian Schwarz
5f7bc3ce60 left equi join for remote_consistent_lsn retrieval 2025-06-06 14:28:20 -07:00
Christian Schwarz
e40b1c79fa carginality estimation for collected hashmap; maintain nodes and nodes_timelines; allocations not showing in flamegraph anymore, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap::get 2025-06-06 13:50:03 -07:00
Christian Schwarz
2f416267dc finish implementing auto-quiescing; needs more tests 2025-06-05 02:41:56 +02:00
Christian Schwarz
d52d560f16 log int est 2025-06-05 02:08:13 +02:00
Christian Schwarz
de908a7b0d WIP auto-quiesce 2025-06-05 01:56:55 +02:00
Christian Schwarz
687fb25d41 WIP 2025-06-05 01:28:55 +02:00
Christian Schwarz
eaa91291ae add test case for initial advertisement and fix everything by switching to btrees and proper merge equi join 2025-06-05 01:09:33 +02:00
Christian Schwarz
fc9f38dd2d continue 2025-06-04 22:48:21 +02:00
Christian Schwarz
c689110ad6 continue 2025-06-04 22:34:01 +02:00
Christian Schwarz
ba6abe203d WIP promising quiescing mechanism 2025-06-04 22:07:48 +02:00
Christian Schwarz
d16e024d49 treat storage more likea n actor itself; dead end also 2025-06-04 20:46:48 +02:00
Christian Schwarz
a4b9335b73 dabble around with effect-style system 2025-06-04 20:38:55 +02:00
Christian Schwarz
2cea7a7838 sketch offloading 2025-06-04 19:19:20 +02:00
Christian Schwarz
e1c1aa74fe get benchmark to work 2025-06-04 18:28:53 +02:00
Christian Schwarz
ee775a24a0 WIP 2025-06-04 16:46:43 +02:00
Christian Schwarz
428f532f08 naive implementation of advertisement generator 2025-06-04 15:29:37 +02:00
Christian Schwarz
5efb0d8072 WIP 2025-06-03 19:45:19 +02:00
Christian Schwarz
36ba2b8e44 WIP proto 2025-06-03 19:14:48 +02:00
Christian Schwarz
1de0f41403 WIP 2025-06-03 18:42:54 +02:00
Christian Schwarz
4d2f27a33f WIP 2025-06-03 14:55:08 +02:00
Christian Schwarz
88a3c9e7fd WIP 2025-06-03 14:00:56 +02:00
Christian Schwarz
df36b9aa62 WIP(ctd): plumbing to feed commit_lsn to wal_advertiser 2025-06-02 12:39:42 +02:00
Christian Schwarz
18a43eeab3 undo the remote_consistent_lsn feedback channel brought in by the PoC merge (includes undo of funneling pageserver_connection field via connection options) 2025-06-02 12:04:35 +02:00
Christian Schwarz
39039d1be7 WIP: plumbing to feed commit_lsn to wal_advertiser 2025-06-02 12:03:30 +02:00
Christian Schwarz
9ee75ceee6 merge fixups; storcon and safekeeper compile again 2025-06-02 11:26:14 +02:00
Christian Schwarz
f5210a367d git merge --squash problame/broker-spof/poc
Squashed commit of the following:

commit 4a1b52c12e
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon May 5 12:45:45 2025 +0200

    WIP

commit 257693e4f2
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon May 5 10:59:45 2025 +0200

    WIP

commit 7aa9beaefd
Author: Christian Schwarz <christian@neon.tech>
Date:   Sun May 4 17:06:46 2025 +0200

    make sk compile

commit 35dbbbaf60
Author: Christian Schwarz <christian@neon.tech>
Date:   Sun May 4 16:50:23 2025 +0200

    move discovery request mechanism into that type as well

    Can't move the policy when we send disovery mechanism because that's
    tied to connection_manager loop state.

commit 6380c9674c
Author: Christian Schwarz <christian@neon.tech>
Date:   Sun May 4 16:22:46 2025 +0200

    move subscription code into new client struct

commit 1f53688189
Author: Christian Schwarz <christian@neon.tech>
Date:   Sun May 4 14:40:44 2025 +0200

    Revert "rip out broker binary target & launch of it in cplane & mention of it in docs"

    This reverts commit 8f201b1580.

commit 8f201b1580
Author: Christian Schwarz <christian@neon.tech>
Date:   Sun May 4 14:38:52 2025 +0200

    rip out broker binary target & launch of it in cplane & mention of it in docs
2025-06-02 11:19:37 +02:00
Christian Schwarz
f36520eb94 stub api impl 2025-06-02 11:19:28 +02:00
Christian Schwarz
afa35eea87 trigger now only does insertions; app loop will do cleanup; prepare API for cleanup 2025-05-30 20:36:50 +02:00
Christian Schwarz
8eb853b731 finish the stub implementation of storcon side, it now PUTs to SKs and gets back 404s 2025-05-28 19:29:32 +02:00
Christian Schwarz
a95015d967 triggers for timelines table and ps/sk row deletion 2025-05-28 13:14:37 +02:00
Christian Schwarz
3836ee8539 finish prototyping event changes via triggers 2025-05-27 18:28:22 +02:00
Christian Schwarz
a6bd4a3be6 Revert "abandoned prototype how it would be if we do what triggers do but in the app"
This reverts commit 24d96e4372.
2025-05-27 14:13:10 +02:00
Christian Schwarz
24d96e4372 abandoned prototype how it would be if we do what triggers do but in the app 2025-05-27 14:12:48 +02:00
Christian Schwarz
29ea89b61d trigger-based thing 2025-05-27 14:12:34 +02:00
Christian Schwarz
322e742e4c schema 2025-05-27 13:44:25 +02:00
51 changed files with 2727 additions and 234 deletions

75
Cargo.lock generated
View File

@@ -6064,8 +6064,10 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"sk_ps_discovery",
"smallvec",
"storage_broker",
"storage_controller_client",
"strum",
"strum_macros",
"thiserror 1.0.69",
@@ -6077,6 +6079,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic",
"tracing",
"tracing-subscriber",
"url",
@@ -6572,6 +6575,76 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sk_ps_discovery"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"clap",
"crc32c",
"criterion",
"desim",
"env_logger",
"fail",
"futures",
"hex",
"http 1.1.0",
"http-utils",
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"pem",
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
"remote_storage",
"reqwest",
"rustls 0.23.18",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
"sd-notify",
"serde",
"serde_json",
"sha2",
"smallvec",
"storage_broker",
"strum",
"strum_macros",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic",
"tracing",
"tracing-subscriber",
"url",
"utils",
"wal_decoder",
"walproposer",
"workspace_hack",
]
[[package]]
name = "slab"
version = "0.4.8"
@@ -6678,6 +6751,7 @@ dependencies = [
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tokio-util",
"tonic",
"tonic-build",
"tracing",
@@ -6690,6 +6764,7 @@ name = "storage_controller"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"bytes",
"camino",
"chrono",

View File

@@ -43,7 +43,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"endpoint_storage", "libs/sk_ps_discovery",
]
[workspace.package]
@@ -262,6 +262,7 @@ pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
sk_ps_discovery = { path = "./libs/sk_ps_discovery" }
desim = { version = "0.1", path = "./libs/desim" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }

View File

@@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::shard::ShardIndex;
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
@@ -309,3 +311,29 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "action")]
pub enum TenantShardPageserverAttachmentChange {
Attach(TenantShardPageserverAttachment),
Detach(TenantShardPageserverAttachment),
}
impl TenantShardPageserverAttachmentChange {
pub fn attachment(&self) -> &TenantShardPageserverAttachment {
match self {
TenantShardPageserverAttachmentChange::Attach(a) => a,
TenantShardPageserverAttachmentChange::Detach(a) => a,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverAttachment {
pub shard_id: ShardIndex,
pub generation: Generation,
pub ps_id: NodeId,
// TODO: avoid transmitting this with every request.
// How nice things could be if there were simple DNS records for ps-$node_id.$cell.$region.$cloud.neon.tech
pub ps_hostname: String, // TODO: some type safety
}

View File

@@ -0,0 +1,81 @@
[package]
name = "sk_ps_discovery"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
workspace_hack.workspace = true
async-stream.workspace = true
anyhow.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
crc32c.workspace = true
fail.workspace = true
hex.workspace = true
humantime.workspace = true
http.workspace = true
hyper0.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pageserver_api.workspace = true
postgres-protocol.workspace = true
pprof.workspace = true
rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
rustls.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-tar.workspace = true
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing.workspace = true
url.workspace = true
metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
http-utils.workspace = true
utils.workspace = true
wal_decoder.workspace = true
env_logger.workspace = true
[dev-dependencies]
criterion.workspace = true
itertools.workspace = true
walproposer.workspace = true
rand.workspace = true
desim.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "bench"
harness = false

View File

@@ -0,0 +1,97 @@
//! WAL ingestion benchmarks.
use std::time::Instant;
use criterion::{Criterion, criterion_group, criterion_main};
use pprof::criterion::{Output, PProfProfiler};
use sk_ps_discovery::{
AttachmentUpdate, RemoteConsistentLsnAdv, TenantShardAttachmentId, TimelineAttachmentId,
};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::ShardIndex,
};
/// Use jemalloc and enable profiling, to mirror bin/safekeeper.rs.
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "malloc_conf")]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_simple,
);
criterion_main!(benches);
fn bench_simple(c: &mut Criterion) {
let mut g = c.benchmark_group("simple");
// setup
let mut world = sk_ps_discovery::World::default();
// Simplified view: lots of unsharded tenants with one timeline each
let n_pageservers = 20;
let n_tenant_shards_per_pageserver = 2000;
for ps_id in 1..=n_pageservers {
for _ in ..n_tenant_shards_per_pageserver {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
for generation in 10..=11 {
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(generation),
};
let timeline_attachment = TimelineAttachmentId {
tenant_timeline_id: TenantTimelineId {
tenant_id,
timeline_id,
},
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(generation),
};
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: sk_ps_discovery::AttachmentUpdateAction::Attach {
ps_id: NodeId(ps_id),
},
});
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
remote_consistent_lsn: Lsn(23),
attachment: timeline_attachment,
});
}
world.handle_commit_lsn_advancement(
TenantTimelineId {
tenant_id,
timeline_id,
},
Lsn(42),
);
}
}
// setup done
let world = world;
g.bench_function("get_commit_lsn_advertisements", |bencher| {
bencher.iter_custom(|iters| {
let started = Instant::now();
for _ in 0..iters {
criterion::black_box(world.get_commit_lsn_advertisements());
}
let elapsed = started.elapsed();
elapsed
});
});
g.finish();
}

View File

@@ -0,0 +1,515 @@
#[cfg(test)]
mod tests;
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
ops::RangeInclusive,
};
use tracing::{info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
merge_join,
shard::ShardIndex,
};
#[derive(Debug, Default)]
pub struct World {
attachments: BTreeMap<TenantShardAttachmentId, NodeId>,
attachment_count: HashMap<TenantId, u16>,
nodes_timelines: HashMap<NodeId, HashMap<TenantTimelineId, u16>>, // u16 is a refcount from each timeline attachment id
// continously maintained aggregate for efficient decisionmaking on quiescing;
// quiesced timelines are always caught up
// can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn)
caught_up_count: HashMap<TenantTimelineId, u16>,
// BEGIN quiescing/active split
quiesced_timelines: BTreeMap<TenantTimelineId, Lsn>,
// ^
// either a timeline is in quiesced_timelines
// or it is below
// v
commit_lsns: BTreeMap<TenantTimelineId, Lsn>,
remote_consistent_lsns: BTreeMap<TimelineAttachmentId, Lsn>,
// END quiescing/active split
// other fields
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct TenantShardAttachmentId {
pub tenant_id: TenantId,
pub shard_id: ShardIndex,
pub generation: Generation,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct TimelineAttachmentId {
pub tenant_timeline_id: TenantTimelineId,
pub shard_id: ShardIndex,
pub generation: Generation,
}
pub struct AttachmentUpdate {
pub tenant_shard_attachment_id: TenantShardAttachmentId,
pub action: AttachmentUpdateAction,
}
pub enum AttachmentUpdateAction {
Attach { ps_id: NodeId },
Detach,
}
pub struct RemoteConsistentLsnAdv {
pub attachment: TimelineAttachmentId,
pub remote_consistent_lsn: Lsn,
}
impl World {
fn check_invariants(&self) {
if !cfg!(debug_assertions) {
return;
}
// caught_up_count maintenance
{
for (tenant_timeline_id, caught_up_count) in
self.caught_up_count.iter().map(|(k, v)| (*k, *v))
{
let attachment_count = *self
.attachment_count
.get(&tenant_timeline_id.tenant_id)
.unwrap();
assert!(caught_up_count <= attachment_count);
if caught_up_count == attachment_count {
self.quiesced_timelines.contains_key(&tenant_timeline_id);
// remote_consistent_lsn and commit_lsns is empty, checked by "quiescing XOR ..." below
} else {
let commit_lsn = self.commit_lsns[&&tenant_timeline_id];
let mut validate_caught_up = 0;
let mut validate_not_caught_up = 0;
for (_, r_c_lsn) in self
.remote_consistent_lsns
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
.map(|(k, v)| (*k, *v))
{
if r_c_lsn == commit_lsn {
validate_caught_up += 1;
} else {
assert!(r_c_lsn < commit_lsn);
validate_not_caught_up += 1;
}
}
assert_eq!(validate_caught_up, caught_up_count);
assert_eq!(
validate_caught_up + validate_not_caught_up,
attachment_count
);
}
}
}
// quiescing XOR ...
{
let quiesced_timelines: HashSet<TenantTimelineId> =
self.quiesced_timelines.keys().cloned().collect();
let commit_lsn_timelines: HashSet<TenantTimelineId> =
self.commit_lsns.keys().cloned().collect();
let remote_consistent_lsn_timelines: HashSet<TenantTimelineId> = self
.remote_consistent_lsns
.keys()
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id)
.collect();
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count());
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count());
}
// nodes_timelines maintenance
{
let mut expect: HashMap<NodeId, HashMap<TenantTimelineId, u16>> = HashMap::new();
let all_ttids: BTreeSet<TenantTimelineId> = self
.quiesced_timelines
.keys()
.cloned()
.chain(
self.remote_consistent_lsns
.keys()
.cloned()
.map(|tlaid| tlaid.tenant_timeline_id),
)
.collect();
for ttid in all_ttids {
for (_, node_id) in self
.attachments
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
.map(|(k, v)| (*k, *v))
{
let expect = expect.entry(node_id).or_default();
let refcount = expect.entry(ttid).or_default();
*refcount += 1;
}
}
assert_eq!(expect, self.nodes_timelines);
}
}
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
self.check_invariants();
use AttachmentUpdateAction::*;
use btree_map::Entry::*;
let AttachmentUpdate {
tenant_shard_attachment_id,
action,
} = upd;
match (action, self.attachments.entry(tenant_shard_attachment_id)) {
(Attach { ps_id }, Occupied(e)) if *e.get() == ps_id => {
info!("attachment is already known")
}
(Attach { ps_id }, Occupied(e)) => {
warn!(current_node=%e.get(), proposed_node=%ps_id, "ignoring update that moves attachment to a different pageserver");
}
(Attach { ps_id }, Vacant(e)) => {
e.insert(ps_id);
// Keep attachmount_count up to date
let attachment_count = self
.attachment_count
.entry(tenant_shard_attachment_id.tenant_id)
.or_default();
*attachment_count += attachment_count.checked_add(1).unwrap();
// Keep nodes_timelines up to date
let nodes_timelines = self.nodes_timelines.entry(ps_id).or_default();
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
tenant_shard_attachment_id.tenant_id,
)) {
let refcount = nodes_timelines.entry(*ttid).or_default();
*refcount = refcount.checked_add(1).unwrap();
}
if nodes_timelines.is_empty() {
self.nodes_timelines.remove(&ps_id);
}
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
let activate_range =
TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
let activate: HashSet<TenantTimelineId> = self
.quiesced_timelines
.range(activate_range)
.map(|(ttid, _quiesced_lsn)| *ttid)
.collect();
for tenant_timeline_id in activate {
self.activate_timeline(tenant_timeline_id);
}
}
(Detach, Occupied(e)) => {
let ps_id = e.remove();
// Keep attachment count up to date
let attachment_count = self
.attachment_count
.get_mut(&tenant_shard_attachment_id.tenant_id)
.expect("attachment action initializes the hasmap entry");
*attachment_count = attachment_count.checked_sub(1).unwrap();
// Keep nodes_timelines up to date
let nodes_timelines = self
.nodes_timelines
.get_mut(&ps_id)
.expect("attachment action initializes hashmap entry");
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
tenant_shard_attachment_id.tenant_id,
)) {
let refcount = nodes_timelines.entry(*ttid).or_default();
*refcount = refcount.checked_sub(1).unwrap();
}
}
(Detach, Vacant(_)) => {
info!("detachment is already known");
}
}
self.check_invariants();
}
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
self.check_invariants();
let RemoteConsistentLsnAdv {
attachment,
remote_consistent_lsn,
} = adv;
match self.remote_consistent_lsns.entry(attachment) {
btree_map::Entry::Occupied(mut occupied_entry) => {
let current = occupied_entry.get_mut();
use std::cmp::Ordering::*;
match (*current).cmp(&remote_consistent_lsn) {
Less => {
*current = remote_consistent_lsn;
let caught_up_count = self
.caught_up_count
.get_mut(&attachment.tenant_timeline_id)
.unwrap();
*caught_up_count = caught_up_count.checked_add(1).unwrap();
if *caught_up_count
== self.attachment_count[&attachment.tenant_timeline_id.tenant_id]
{
self.quiesce_timeline(attachment.tenant_timeline_id);
}
}
Equal => {
info!("ignoring no-op update, likely duplicate delivery");
}
Greater => {
warn!(
"ignoring advertisement because remote_consistent_lsn is moving backwards"
);
}
}
}
btree_map::Entry::Vacant(_) => {
let ttid = attachment.tenant_timeline_id;
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => {
info!("ignoring no-op update for quiesced timeline");
}
Some(_) => {
self.activate_timeline(ttid);
// recurse one level, guarnateed to hit `Occupied` case above
self.handle_remote_consistent_lsn_advertisement(adv);
}
None => {
info!("ignoring advertisement because timeline is not known");
}
}
}
}
self.check_invariants();
}
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, update: Lsn) {
self.check_invariants();
match self.commit_lsns.entry(ttid) {
btree_map::Entry::Occupied(mut entry) => {
let current = entry.get_mut();
use std::cmp::Ordering::*;
match (*current).cmp(&update) {
Less => {
*current = update;
// We never allow remote_consistent_lsn to be ahead of commit_lsn.
// Therefore, it is safe to say nothing is caught up anymore.
let caught_up_count = self.caught_up_count.get_mut(&ttid).unwrap();
*caught_up_count = 0;
}
Equal => {
// This code runs in safekeeper impl, no reason why there would be duplicate delivery.
warn!("ignoring no-op update; why is this happening?");
}
Greater => {
panic!(
"proposed commit_lsn would move it backwards: current={} update={}",
current, update
);
}
}
}
btree_map::Entry::Vacant(entry) => {
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == update => {
info!("ignoring no-op update for quiesced timeline");
}
Some(_) => {
self.activate_timeline(ttid);
// recurse one level, guarnateed to hit `Occupied` case above
self.handle_commit_lsn_advancement(ttid, update);
}
None => {
info!("first time hearing about this timeline, initializing");
entry.insert(update);
let replaced = self.caught_up_count.insert(ttid, 0);
// only commit_lsn advancement makes timelines known to world
assert_eq!(None, replaced);
for (attachment, node_id) in self
.attachments
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
{
let replaced = self.remote_consistent_lsns.insert(
attachment.timeline_attachment_id(ttid.timeline_id),
Lsn(0),
);
// only commit_lsn advancement makes timelines known to World
assert_eq!(None, replaced);
let nodes_timelines = self.nodes_timelines.entry(*node_id).or_default();
let refcount = nodes_timelines.entry(ttid).or_default();
*refcount = refcount.checked_add(1).unwrap();
}
}
}
}
}
self.check_invariants();
}
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
let mut commit_lsn_advertisements_by_node: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> =
HashMap::with_capacity(self.nodes_timelines.len());
let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v));
let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v));
let remote_consistent_lsns_iter = self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v));
let join = merge_join::inner_equi_join_with_merge_strategy(
commit_lsns_iter,
attachments_iter,
|(tenant_timeline_id, _)| tenant_timeline_id.tenant_id,
|(shard_attachment_id, _)| shard_attachment_id.tenant_id,
);
let join = merge_join::left_equi_join_with_merge_strategy(
join,
remote_consistent_lsns_iter,
|((ttid, _), _)| ttid.tenant_id,
|(tlaid, _)| tlaid.tenant_timeline_id.tenant_id,
);
for ((c, a), r) in join {
let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = c;
let (_, node_id): (TenantShardAttachmentId, NodeId) = a;
match r {
// TODO: can > ever happen?
Some((_, remote_consistent_lsn)) if remote_consistent_lsn >= commit_lsn => {
// this timeline shard attachment is already caught up
continue;
}
Some(_) | None => {
// need to advertise
// -> fallthrough
}
};
// DISTINCT node_id, array_agg(DISTINCT tenant_shard_id )
let for_node = commit_lsn_advertisements_by_node
.entry(node_id)
.or_insert_with(|| HashMap::with_capacity(self.nodes_timelines[&node_id].len()));
match for_node.entry(tenant_timeline_id) {
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);
}
hash_map::Entry::Occupied(occupied_entry) => {
assert_eq!(*occupied_entry.get(), commit_lsn);
}
}
}
commit_lsn_advertisements_by_node
}
fn activate_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
let quiesced_lsn = self
.quiesced_timelines
.remove(&tenant_timeline_id)
.expect("must call this function only on quiesced tenant_timeline_id");
let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn);
assert_eq!(None, replaced);
let reconstruct_remote_consistent_lsn_entries = self
.attachments
.range(TenantShardAttachmentId::tenant_range(
tenant_timeline_id.tenant_id,
))
.map(|(k, _)| *k)
.map(|tenant_shard_attachment_id| {
(
tenant_shard_attachment_id
.timeline_attachment_id(tenant_timeline_id.timeline_id),
quiesced_lsn,
)
});
for (key, value) in reconstruct_remote_consistent_lsn_entries {
let replaced = self.remote_consistent_lsns.insert(key, value);
assert_eq!(None, replaced);
}
}
fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
self.check_invariants();
if self.quiesced_timelines.contains_key(&tenant_timeline_id) {
panic!("only call this function on active timelines");
}
let quiesced_lsn = self
.commit_lsns
.remove(&tenant_timeline_id)
.expect("inconsistent: we checked it's not in quiesced_timelines, so, must be active");
let caught_up_count = self
.caught_up_count
.remove(&tenant_timeline_id)
.expect("inconsistent: we checked it's not in quiesced_timleines, so, must be active");
let mut remove_remote_consistent_lsns = Vec::new();
for (k, remote_consistent_lsn) in self
.remote_consistent_lsns
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
{
assert_eq!(*remote_consistent_lsn, quiesced_lsn);
remove_remote_consistent_lsns.push(*k);
}
assert_eq!(
caught_up_count,
u16::try_from(remove_remote_consistent_lsns.len()).unwrap()
);
for k in remove_remote_consistent_lsns {
let removed = self.remote_consistent_lsns.remove(&k);
assert!(removed.is_some(), "we just added");
}
let replaced = self
.quiesced_timelines
.insert(tenant_timeline_id, quiesced_lsn);
assert_eq!(None, replaced); // we checked at function entry
self.check_invariants();
}
}
impl TimelineAttachmentId {
pub fn timeline_range(ttid: TenantTimelineId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
let generation_range: RangeInclusive<_> = Generation::RANGE;
RangeInclusive::new(
TimelineAttachmentId {
tenant_timeline_id: ttid,
shard_id: *shard_index_range.start(),
generation: *generation_range.start(),
},
TimelineAttachmentId {
tenant_timeline_id: ttid,
shard_id: *shard_index_range.end(),
generation: *generation_range.end(),
},
)
}
pub fn tenant_shard_attachment_id(self) -> TenantShardAttachmentId {
TenantShardAttachmentId {
tenant_id: self.tenant_timeline_id.tenant_id,
shard_id: self.shard_id,
generation: self.generation,
}
}
}
impl TenantShardAttachmentId {
pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId {
TimelineAttachmentId {
tenant_timeline_id: TenantTimelineId {
tenant_id: self.tenant_id,
timeline_id,
},
shard_id: self.shard_id,
generation: self.generation,
}
}
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
let generation_range: RangeInclusive<_> = Generation::RANGE;
RangeInclusive::new(
Self {
tenant_id,
shard_id: *shard_index_range.start(),
generation: *generation_range.start(),
},
Self {
tenant_id,
shard_id: *shard_index_range.end(),
generation: *generation_range.end(),
},
)
}
}

View File

@@ -0,0 +1,224 @@
use utils::{id::TenantId, logging};
use super::*;
use crate::World;
#[track_caller]
fn validate_advertisements(
actual: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>>,
expect: Vec<(NodeId, Vec<(TenantTimelineId, Lsn)>)>,
) {
let expect: HashMap<_, _> = expect
.into_iter()
.map(|(node_id, innermap)| (node_id, innermap.into_iter().collect()))
.collect();
assert_eq!(actual, expect);
}
#[test]
fn basic() {
let mut world = World::default();
let tenant_id = TenantId::from_array([0xff; 16]);
let timeline_id = TimelineId::from_array([1; 16]);
let timeline2 = TimelineId::from_array([2; 16]);
let attachment1 = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let attachment2 = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(3),
};
let ps1 = NodeId(0x100);
// Out of order; in happy path, commit_lsn advances first, but let's test the
// case where safekeeper doesn't know about the attachments yet first, before
// we extend the case to the happy path.
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: attachment1.timeline_attachment_id(timeline_id),
remote_consistent_lsn: Lsn(0x23),
});
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: attachment2.timeline_attachment_id(timeline_id),
remote_consistent_lsn: Lsn(0x42),
});
// SK authoritative info on which advertisements ought exist is still empty
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id: attachment1,
action: AttachmentUpdateAction::Attach { ps_id: ps1 },
});
// We have not inserted any commit_lsn info yet, so, still no advs expected
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
// insert commit_lsn info for different timeline
world.handle_commit_lsn_advancement(
TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
Lsn(0x66),
);
// Advs should still be empty
validate_advertisements(
world.get_commit_lsn_advertisements(),
vec![(
ps1,
vec![(
TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
Lsn(0x66),
)],
)],
);
// Ok, out of order part tested. Now Safekeeper learns about the attachments.
// insert commit_lsn info for the timeline we have remote_consistent_lsn info for
world.handle_commit_lsn_advancement(
TenantTimelineId {
tenant_id,
timeline_id,
},
Lsn(0x55),
);
dbg!(&world);
// Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it.
validate_advertisements(
world.get_commit_lsn_advertisements(),
vec![(
ps1,
vec![(
TenantTimelineId {
tenant_id,
timeline_id,
},
Lsn(0x55),
)],
)],
);
}
#[test]
fn advertisement_for_new_timeline() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
world.handle_commit_lsn_advancement(ttid, Lsn(23));
let advs = world.get_commit_lsn_advertisements();
validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]);
}
#[test]
fn quiescing_timeline_catchup() {
let _guard = logging::init(
logging::LogFormat::Test,
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.unwrap();
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
world.handle_commit_lsn_advancement(ttid, Lsn(23));
assert!(world.quiesced_timelines.is_empty());
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: tenant_shard_attachment_id.timeline_attachment_id(timeline_id),
remote_consistent_lsn: Lsn(23),
});
assert!(world.quiesced_timelines.contains_key(&ttid));
}
#[test]
fn nodes_timelines() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::from_array([0x1; 16]);
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
assert!(world.nodes_timelines.get(&ps_id).is_none());
world.handle_commit_lsn_advancement(ttid, Lsn(0x23));
assert_eq!(world.nodes_timelines[&ps_id].len(), 1);
let timeline2 = TimelineId::from_array([0x2; 16]);
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: TimelineAttachmentId {
tenant_timeline_id: TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
},
remote_consistent_lsn: Lsn(0x42),
});
}
// TODO: need more tests, esp for the removal path

View File

@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, ops::RangeInclusive};
use serde::{Deserialize, Serialize};
@@ -25,7 +25,9 @@ pub enum Generation {
/// scenarios where pageservers might otherwise issue conflicting writes to
/// remote storage
impl Generation {
pub const MIN: Self = Self::None;
pub const MAX: Self = Self::Valid(u32::MAX);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
/// Create a new Generation that represents a legacy key format with
/// no generation suffix

View File

@@ -1,5 +1,6 @@
use std::fmt;
use std::num::ParseIntError;
use std::ops::RangeInclusive;
use std::str::FromStr;
use anyhow::Context;
@@ -320,6 +321,19 @@ impl TenantTimelineId {
pub fn empty() -> Self {
Self::new(TenantId::from([0u8; 16]), TimelineId::from([0u8; 16]))
}
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
timeline_id: TimelineId::from_array([u8::MIN; 16]),
},
Self {
tenant_id,
timeline_id: TimelineId::from_array([u8::MAX; 16]),
},
)
}
}
impl fmt::Display for TenantTimelineId {

View File

@@ -95,6 +95,9 @@ pub mod guard_arc_swap;
pub mod elapsed_accum;
pub mod merge_join;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -0,0 +1,164 @@
pub fn inner_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
l: L,
r: R,
key_l: FL,
key_r: FR,
) -> impl Iterator<Item = (LI, RI)>
where
L: Iterator<Item = LI>, // + Sorted
R: Iterator<Item = RI>, // + Sorted
FL: 'static + Fn(&LI) -> K,
FR: 'static + Fn(&RI) -> K,
LI: Copy,
RI: Copy,
K: PartialEq + Eq + Ord,
{
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
std::iter::from_fn(move || {
loop {
match (l.peek(), r.peek()) {
(Some((_, lk)), Some((_, rk))) if lk < rk => {
drop(l.next());
continue;
}
(Some((_, lk)), Some((_, rk))) if lk > rk => {
drop(r.next());
continue;
}
(Some((lv, lk)), Some((_, rk))) => {
assert!(lk == rk);
let (rv, _) = r.next().unwrap();
return Some((lv.clone(), rv));
}
(None, None) | (None, Some(_)) | (Some(_), None) => return None,
}
}
})
}
pub fn left_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
l: L,
r: R,
key_l: FL,
key_r: FR,
) -> impl Iterator<Item = (LI, Option<RI>)>
where
L: Iterator<Item = LI>, // + Sorted
R: Iterator<Item = RI>, // + Sorted
FL: 'static + Fn(&LI) -> K,
FR: 'static + Fn(&RI) -> K,
LI: Copy,
RI: Copy,
K: PartialEq + Eq + Ord,
{
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
let mut l_had_match = false;
std::iter::from_fn(move || {
loop {
match (l.peek(), r.peek()) {
(Some((_, lk)), Some((_, rk))) if lk < rk => {
let (lv, _) = l.next().unwrap();
if l_had_match {
l_had_match = false;
continue;
} else {
return Some((lv, None));
}
}
(Some((_, _)), None) => {
let (lv, _) = l.next().unwrap();
if l_had_match {
l_had_match = false;
continue;
} else {
return Some((lv, None));
}
}
(Some((_, lk)), Some((_, rk))) if lk > rk => {
drop(r.next());
continue;
}
(Some((lv, lk)), Some((_, rk))) => {
l_had_match = true;
assert!(lk == rk);
let (rv, _) = r.next().unwrap();
return Some((lv.clone(), Some(rv)));
}
(None, None) | (None, Some(_)) => return None,
}
}
})
}
#[cfg(test)]
mod tests {
#[test]
fn inner_equi_basic() {
let l = vec![b"a", b"c"];
let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"];
let res: Vec<_> = super::inner_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(
res,
vec![
(b"a", b"aa"),
(b"a", b"ad"),
(b"c", b"ca"),
(b"c", b"cb"),
(b"c", b"cd"),
]
);
}
#[test]
fn left_equi_basic() {
/*
create table aleft (id text, aleft text);
create table aright (id text, aright text);
insert into aleft values ('a', 'a'), ('b', 'b');
insert into aright values ('a', 'aa'), ('a', 'ab'), ('c', 'cd');
select * from aleft left join aright using ("id");
*/
let l = vec![b"a", b"b"];
let r = vec![b"aa", b"ab", b"cd"];
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(
res,
vec![(b"a", Some(b"aa")), (b"a", Some(b"ab")), (b"b", None)]
);
}
#[test]
fn left_equi_basic_2() {
let l = vec![b"b"];
let r = vec![b"aa", b"ab", b"bb"];
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(res, vec![(b"b", Some(b"bb"))])
}
}

View File

@@ -52,6 +52,7 @@ pub struct TenantShardId {
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
pub const MIN: Self = Self(0);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
@@ -85,7 +86,9 @@ impl ShardCount {
}
impl ShardNumber {
pub const MIN: Self = Self(0);
pub const MAX: Self = Self(u8::MAX);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
}
impl TenantShardId {
@@ -100,16 +103,17 @@ impl TenantShardId {
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
shard_number: shard_index_range.start().shard_number,
shard_count: shard_index_range.start().shard_count,
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
shard_number: shard_index_range.end().shard_number,
shard_count: shard_index_range.end().shard_count,
},
)
}
@@ -241,6 +245,16 @@ impl From<[u8; 18]> for TenantShardId {
}
impl ShardIndex {
pub const MIN: Self = ShardIndex {
shard_number: ShardNumber::MIN,
shard_count: ShardCount::MIN,
};
pub const MAX: Self = ShardIndex {
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
};
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,

View File

@@ -4,3 +4,5 @@ pub mod duplex;
pub mod gate;
pub mod spsc_fold;
pub mod spsc_watch;

View File

@@ -56,7 +56,7 @@ impl<T: Send> Sender<T> {
/// # Panics
///
/// If `try_fold` panics, any subsequent call to `send` panic.
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), SendError>
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), (T, SendError)>
where
F: Fn(&mut T, T) -> Result<(), T>,
{
@@ -104,7 +104,9 @@ impl<T: Send> Sender<T> {
}
Poll::Pending
}
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
State::ReceiverGone => {
Poll::Ready(Err((value.take().unwrap(), SendError::ReceiverGone)))
}
State::SenderGone(_)
| State::AllGone
| State::SenderDropping

View File

@@ -0,0 +1,55 @@
//! watch is probably not the right word, because we do take out
use tokio_util::sync::CancellationToken;
use crate::sync::spsc_fold;
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = spsc_fold::channel();
let cancel = CancellationToken::new();
(
Sender {
tx,
_cancel: cancel.clone().drop_guard(),
},
Receiver { rx, cancel },
)
}
pub struct Sender<T> {
tx: spsc_fold::Sender<T>,
_cancel: tokio_util::sync::DropGuard,
}
pub struct Receiver<T> {
rx: spsc_fold::Receiver<T>,
cancel: CancellationToken,
}
impl<T: Send> Sender<T> {
pub fn send_replace(&mut self, value: T) -> Result<(), (T, spsc_fold::SendError)> {
poll_ready(self.tx.send(value, |old, new| {
*old = new;
Ok(())
}))
}
}
impl<T: Send> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, spsc_fold::RecvError> {
self.rx.recv().await
}
pub async fn cancelled(&mut self) {
self.cancel.cancelled().await
}
}
fn poll_ready<F: Future<Output = O>, O>(f: F) -> O {
futures::executor::block_on(async move {
let f = std::pin::pin!(f);
match futures::poll!(f) {
std::task::Poll::Ready(r) => r,
std::task::Poll::Pending => unreachable!("expecting future to always return Ready"),
}
})
}

View File

@@ -423,11 +423,14 @@ fn start_pageserver(
.map(storage_broker::Certificate::from_pem),
);
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(
let service_client = storage_broker::connect(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
tls_config,
)
)?;
anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(
service_client,
))
})
.with_context(|| {
format!(

View File

@@ -100,7 +100,7 @@ pub struct State {
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: &'static [&'static str],
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
@@ -114,7 +114,7 @@ impl State {
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,

View File

@@ -48,7 +48,6 @@ use remote_timeline_client::{
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::offload::{OffloadError, offload_timeline};
@@ -153,7 +152,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub broker_client: storage_broker::TimelineUpdatesSubscriber,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
@@ -2107,7 +2106,7 @@ impl TenantShard {
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
@@ -2242,7 +2241,7 @@ impl TenantShard {
self: &Arc<Self>,
timeline_id: TimelineId,
new_state: TimelineArchivalState,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
@@ -2571,7 +2570,7 @@ impl TenantShard {
pub(crate) async fn create_timeline(
self: &Arc<TenantShard>,
params: CreateTimelineParams,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() {
@@ -3299,7 +3298,7 @@ impl TenantShard {
/// to delay background jobs. Background jobs can be started right away when None is given.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {

View File

@@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{Notify, oneshot, watch};
@@ -2080,7 +2079,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::TenantShard>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
@@ -3114,7 +3113,7 @@ impl Timeline {
fn launch_wal_receiver(
self: &Arc<Self>,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
) {
info!(
"launching WAL receiver for timeline {} of tenant {}",

View File

@@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> {
tenant: Arc<TenantShard>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.write(|raw_timeline| async move {

View File

@@ -28,7 +28,6 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -70,7 +69,7 @@ impl WalReceiver {
pub fn start(
timeline: Arc<Timeline>,
conf: WalReceiverConf,
mut broker_client: BrokerClientChannel,
mut broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Self {
let tenant_shard_id = timeline.tenant_shard_id;

View File

@@ -17,19 +17,12 @@ use std::time::Duration;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use postgres_connection::PgConnectionConfig;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
TypedMessage,
};
use storage_broker::{BrokerClientChannel, Code, Streaming};
use storage_broker::proto::SafekeeperDiscoveryResponse;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::{
@@ -56,7 +49,7 @@ pub(crate) struct Cancelled;
///
/// Not cancellation-safe. Use `cancel` token to request cancellation.
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
broker_client: &mut storage_broker::TimelineUpdatesSubscriber,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
cancel: &CancellationToken,
@@ -81,11 +74,6 @@ pub(super) async fn connection_manager_loop_step(
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
@@ -101,7 +89,12 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
let (timeline_updates, mut discovery_requester) = broker_client.subscribe(
connection_manager_state.timeline.tenant_shard_id,
connection_manager_state.timeline.timeline_id,
cancel,
);
let mut timeline_updates = Box::pin(timeline_updates);
debug!("Subscribed for broker timeline updates");
loop {
@@ -155,29 +148,10 @@ pub(super) async fn connection_manager_loop_step(
}
},
// Got a new update from the broker
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
return Ok(());
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
return Ok(());
}
}
// Got a new update from the broker.
// The stream ends with None if and only if `cancel` is cancelled.
Some(timeline_update) = timeline_updates.next() => {
connection_manager_state.register_timeline_update(timeline_update)
},
new_event = async {
@@ -258,32 +232,11 @@ pub(super) async fn connection_manager_loop_step(
tokio::time::sleep(next_discovery_ts - now).await;
}
let tenant_timeline_id = Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
let msg = TypedMessage {
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
info!("No active connection and no candidates, sending discovery request to the broker");
discovery_requester.request().await;
last_discovery_ts = Some(std::time::Instant::now());
info!("No active connection and no candidates, sending discovery request to the broker");
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = broker_client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
None
} => {}
}
@@ -298,63 +251,6 @@ pub(super) async fn connection_manager_loop_step(
}
}
/// Endlessly try to subscribe for broker updates for a given timeline.
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
cancel: &CancellationToken,
) -> Result<Streaming<TypedMessage>, Cancelled> {
let mut attempt = 0;
loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
}),
}),
};
match {
tokio::select! {
r = broker_client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
Ok(resp) => {
return Ok(resp.into_inner());
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
"Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
);
continue;
}
}
}
}
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
@@ -695,44 +591,14 @@ impl ConnectionManagerState {
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
return;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
return;
}
}
}
_ => {
// unexpected message
return;
}
};
fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) {
WALRECEIVER_BROKER_UPDATES.inc();
let storage_broker::TimelineShardUpdate {
is_discovery,
inner: timeline_update,
} = timeline_update;
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
@@ -1013,7 +879,7 @@ impl ConnectionManagerState {
shard_stripe_size,
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
availability_zone: self.conf.availability_zone.as_deref(),
};
match wal_stream_connection_config(connection_conf_args) {

View File

@@ -52,6 +52,7 @@ tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-tar.workspace = true
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing.workspace = true
url.workspace = true
metrics.workspace = true
@@ -62,9 +63,11 @@ pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
sk_ps_discovery.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true
storage_controller_client.workspace = true
tokio-stream.workspace = true
http-utils.workspace = true
utils.workspace = true

View File

@@ -8,11 +8,12 @@ use std::error::Error as _;
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
TimelineStatus,
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
TenantShardPageserverAttachmentChange, TimelineCreateRequest, TimelineStatus,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::shard::TenantShardId;
#[derive(Debug, Clone)]
pub struct Client {
@@ -189,6 +190,20 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn post_tenant_shard_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
body: TenantShardPageserverAttachmentChange,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/pageserver_attachments",
tenant_shard_id.tenant_id,
self.mgmt_api_endpoint
);
let resp = self.post(uri, body).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
async fn post<B: serde::Serialize, U: IntoUrl>(
&self,
uri: U,

View File

@@ -25,7 +25,7 @@ use safekeeper::defaults::{
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -626,6 +626,30 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));
let ps_connectivity_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(
global_timelines
.get_pageserver_connectivity()
.task_main()
.instrument(info_span!("pageserver_connectivity")),
)
.map(|res| ("pageserver connectivity".to_owned(), res));
tasks_handles.push(Box::pin(ps_connectivity_handle));
let wal_advertiser_task_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_ADVERTISER_RUNTIME.handle())
.spawn(
global_timelines
.get_wal_advertiser()
.task_main()
.instrument(info_span!("wal_advertiser_main")),
)
.map(|res| ("wal advertiser task handle".to_owned(), res));
tasks_handles.push(Box::pin(wal_advertiser_task_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
// TODO: update tokio-stream, convert to real async Stream with

View File

@@ -50,7 +50,8 @@ async fn push_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
let outbound = async_stream::stream! {
@@ -97,7 +98,8 @@ async fn pull_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {
@@ -153,7 +155,8 @@ async fn discover_loop(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
make_tls_config(&conf),
)?;
)?
.into_raw_grpc_client();
let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {

View File

@@ -67,6 +67,19 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
})
}
async fn post_tenant_pageserver_attachments(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let body: models::TenantShardPageserverAttachmentChange = json_request(&mut request).await?;
let global_timelines = get_global_timelines(&request);
let wal_advertiser = global_timelines.get_wal_advertiser();
wal_advertiser
.update_pageserver_attachments(tenant_id, body)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates all timelines for the tenant and removes its data directory.
/// See `timeline_delete_handler`.
async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -718,6 +731,9 @@ pub fn make_router(
})
})
.get("/v1/utilization", |r| request_span(r, utilization_handler))
.post("/v1/tenant/:tenant_id/pageserver_attachments", |r| {
request_span(r, post_tenant_pageserver_attachments)
})
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})

View File

@@ -38,11 +38,13 @@ pub mod timeline_eviction;
pub mod timeline_guard;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_advertiser;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
pub(crate) mod pageserver_connectivity;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;
@@ -123,6 +125,7 @@ pub struct SafeKeeperConf {
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
pub storage_controller_api: Option<Uri>,
}
impl SafeKeeperConf {
@@ -168,6 +171,7 @@ impl SafeKeeperConf {
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
storage_controller_api: None,
}
}
}
@@ -198,6 +202,14 @@ pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.expect("Failed to create broker runtime")
});
pub static WAL_ADVERTISER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("wal advertiser worker")
.enable_all()
.build()
.expect("Failed to create broker runtime")
});
pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("WAL backup worker")

View File

@@ -0,0 +1,117 @@
use desim::world::Node;
use hyper::Uri;
use pageserver_api::controller_api;
use utils::id::TenantId;
use crate::timeline::Timeline;
use std::{
collections::{HashMap, hash_map},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use anyhow::Context;
use tracing::{Instrument, error, info, info_span, warn};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
sync::{spsc_fold, spsc_watch},
};
use crate::{GlobalTimelines, SafeKeeperConf};
type Advs = HashMap<TenantTimelineId, Lsn>;
#[derive(Default)]
pub struct GlobalState {
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
}
enum Message {
Resolve {
ps_id: NodeId,
reply: tokio::sync::oneshot::Sender<tokio::sync::watch::Receiver<hyper::Uri>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("cancelled")]
Cancelled,
}
impl GlobalState {
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
let mut ret = None;
self.inner.get_or_init(|| {
let (tx, task_fut) = MainTask::prepare_run();
ret = Some(task_fut);
tx
});
ret.expect("must only call this method once")
}
}
struct MainTask {
rx: tokio::sync::mpsc::Receiver<Message>,
}
impl MainTask {
fn prepare_run() -> (
tokio::sync::mpsc::Sender<Message>,
impl Future<Output = anyhow::Result<()>> + Send,
) {
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
let task = MainTask { rx };
(tx, task.task())
}
async fn task(mut self) -> anyhow::Result<()> {
// TODO: persistence
let storcon_client = todo!();
let mut resolution: HashMap<NodeId, tokio::sync::watch::Sender<hyper::Uri>> =
HashMap::new();
while let Some(rx) = self.rx.recv().await {
match rx {
Message::Resolve { ps_id, reply } => match resolution.entry(ps_id) {
hash_map::Entry::Occupied(e) => {}
hash_map::Entry::Vacant(e) => {
tokio::spawn(
ResolutionTask { ps_id, storcon_client }.run()
)
},
},
}
}
}
}
struct ResolutionTask {
ps_id: NodeId,
storcon_client: storage_controller_client::control_api::Client,
}
impl ResolutionTask {
pub async fn run(self) -> Result<Uri, Error> {
loop {
// XXX: well-defined upcall API?
let res = self
.storcon_client
.dispatch(
reqwest::Method::GET,
format!("control/v1/node/{}", self.node_id),
None,
)
.await;
let node: NodeDescribeResponse = match res {
Ok(res) => res,
Err(err) => {
warn!("storcon upcall failed")
}
};
}
}
}

View File

@@ -117,6 +117,7 @@ impl Env {
Arc::new(TimelinesSet::default()), // ignored for now
RateLimiter::new(0, 0),
wal_backup,
todo!(),
);
Ok(timeline)
}

View File

@@ -39,7 +39,9 @@ use crate::wal_backup;
use crate::wal_backup::{WalBackup, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
use crate::{
SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage,
};
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
@@ -547,6 +549,7 @@ impl Timeline {
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();
@@ -570,6 +573,7 @@ impl Timeline {
rx,
partial_backup_rate_limiter,
wal_backup,
wal_advertiser,
)
.await
}

View File

@@ -42,6 +42,7 @@ impl Manager {
&& next_event.is_none()
&& self.access_service.is_empty()
&& !self.tli_broker_active.get()
&& self.wal_advertiser.ready_for_eviction()
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
// And it is the next one after the last removed. Given that local

View File

@@ -22,7 +22,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, info, info_span, instrument, warn};
use utils::lsn::Lsn;
use crate::SafeKeeperConf;
use crate::control_file::{FileStorage, Storage};
use crate::metrics::{
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES,
@@ -37,6 +36,7 @@ use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
use crate::{SafeKeeperConf, wal_advertiser};
pub(crate) struct StateSnapshot {
// inmem values
@@ -201,6 +201,7 @@ pub(crate) struct Manager {
pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
pub(crate) wal_advertiser: wal_advertiser::SafekeeperTimelineHandle,
// current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
@@ -240,6 +241,7 @@ pub async fn main_task(
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) {
tli.set_status(Status::Started);
@@ -259,6 +261,7 @@ pub async fn main_task(
manager_tx,
global_rate_limiter,
wal_backup,
wal_advertiser,
)
.await;
@@ -287,7 +290,8 @@ pub async fn main_task(
mgr.set_status(Status::UpdateBackup);
let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot);
mgr.set_status(Status::UpdateControlFile);
mgr.update_control_file_save(&state_snapshot, &mut next_event)
@@ -419,6 +423,7 @@ impl Manager {
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
wal_advertiser: Arc<wal_advertiser::GlobalState>,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
@@ -428,6 +433,7 @@ impl Manager {
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
wal_advertiser: wal_advertiser.new_timeline(tli.clone()).await.unwrap(),
last_removed_segno: 0,
is_offloaded,
backup_task: None,
@@ -494,8 +500,8 @@ impl Manager {
is_wal_backup_required
}
/// Update is_active flag and returns its value.
fn update_is_active(
/// Update broker is_active flag and returns its value.
fn update_broker_active(
&mut self,
is_wal_backup_required: bool,
num_computes: usize,
@@ -505,6 +511,7 @@ impl Manager {
|| num_computes > 0
|| state.remote_consistent_lsn < state.commit_lsn;
// update the broker timeline set
if self.tli_broker_active.set(is_active) {
// write log if state has changed

View File

@@ -27,7 +27,7 @@ use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_t
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::WalBackup;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage};
use crate::{SafeKeeperConf, control_file, pageserver_connectivity, wal_advertiser, wal_storage};
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
@@ -47,6 +47,8 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
wal_advertisement: Arc<wal_advertiser::GlobalState>,
pageserver_connectivity: Arc<pageserver_connectivity::GlobalState>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
}
@@ -60,12 +62,14 @@ impl GlobalTimelinesState {
Arc<TimelinesSet>,
RateLimiter,
Arc<WalBackup>,
Arc<wal_advertiser::GlobalState>,
) {
(
self.conf.clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
self.wal_backup.clone(),
self.wal_advertisement.clone(),
)
}
@@ -101,6 +105,8 @@ impl GlobalTimelines {
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()),
pageserver_connectivity: Arc::new(pageserver_connectivity::GlobalState::default()),
global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}),
@@ -158,12 +164,13 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = {
let state = self.state.lock().unwrap();
state.get_dependencies()
};
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
{
@@ -187,6 +194,7 @@ impl GlobalTimelines {
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
wal_backup.clone(),
wal_advertiser.clone(),
);
}
// If we can't load a timeline, it's most likely because of a corrupted
@@ -238,7 +246,7 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _, _) = {
let (conf, _, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -283,7 +291,7 @@ impl GlobalTimelines {
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = {
let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
@@ -338,6 +346,7 @@ impl GlobalTimelines {
broker_active_set,
partial_backup_rate_limiter,
wal_backup,
wal_advertiser.clone(),
);
drop(timeline_shared_state);
Ok(timeline)
@@ -590,6 +599,14 @@ impl GlobalTimelines {
Ok(deleted)
}
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::GlobalState> {
self.state.lock().unwrap().wal_advertisement.clone()
}
pub fn get_pageserver_connectivity(&self) -> Arc<pageserver_connectivity::GlobalState> {
self.state.lock().unwrap().pageserver_connectivity.clone()
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = self.state.lock().unwrap();

View File

@@ -0,0 +1,201 @@
mod persistence;
mod pageserver_connectivity;
use utils::id::TenantId;
use crate::timeline::Timeline;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use anyhow::Context;
use tracing::{Instrument, error, info, info_span, warn};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
sync::{spsc_fold, spsc_watch},
};
use crate::{GlobalTimelines, SafeKeeperConf};
type Advs = HashMap<TenantTimelineId, Lsn>;
#[derive(Default)]
pub struct GlobalState {
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
}
pub struct SafekeeperTimelineHandle {
tx: tokio::sync::mpsc::Sender<Message>,
}
enum Message {
NewTimeline {
reply: tokio::sync::oneshot::Sender<Result<(), Error>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("cancelled")]
Cancelled,
}
impl GlobalState {
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
let mut ret = None;
self.inner.get_or_init(|| {
let (tx, task_fut) = MainTask::prepare_run();
ret = Some(task_fut);
tx
});
ret.expect("must only call this method once")
}
pub async fn new_timeline(
&self,
tli: Arc<Timeline>,
) -> Result<SafekeeperTimelineHandle, Error> {
let tx = self.inner.get().unwrap().clone();
let handle = SafekeeperTimelineHandle { tx };
let (reply, rx) = tokio::sync::oneshot::channel();
let Ok(()) = handle.tx.send(Message::NewTimeline { reply }).await else {
return Err(Error::Cancelled);
};
let Ok(res) = rx.await else {
return Err(Error::Cancelled);
};
Ok(handle)
}
pub fn update_pageserver_attachments(
&self,
tenant_id: TenantId,
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
) -> anyhow::Result<()> {
todo!()
}
}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
struct MainTask {
rx: tokio::sync::mpsc::Receiver<Message>,
world: sk_ps_discovery::World,
senders: HashMap<utils::id::NodeId, spsc_watch::Sender<Advs>>,
}
impl MainTask {
fn prepare_run() -> (
tokio::sync::mpsc::Sender<Message>,
impl Future<Output = anyhow::Result<()>> + Send,
) {
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
let task = MainTask {
rx,
world: sk_ps_discovery::World::default(),
senders: Default::default(),
};
(tx, task.task())
}
async fn task(mut self) -> anyhow::Result<()> {
let mut adv_frequency = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = adv_frequency.tick() => {
let start = Instant::now();
self.advertisements_iteration();
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(10) {
warn!(?elapsed, "advertisements iteration is slow");
}
},
message = self.rx.recv() => {
match message {
None => anyhow::bail!("last main task sender dropped, shouldn't happen, exiting"),
Some(_) => todo!(),
}
},
}
}
}
fn advertisements_iteration(&mut self) {
loop {
let advertisements = self.world.get_commit_lsn_advertisements();
for (node_id, mut advs) in advertisements {
'inner: loop {
let tx = self.senders.entry(node_id).or_insert_with(|| {
let (tx, rx) = spsc_watch::channel();
tokio::spawn(
PageserverTask {
ps_id: node_id,
endpoint: todo!(),
advs: rx,
}
.run()
.instrument(info_span!("wal_advertiser", ps_id=%node_id)),
);
tx
});
if let Err((failed, err)) = tx.send_replace(advs) {
self.senders.remove(&node_id);
advs = failed;
} else {
break 'inner;
}
}
}
}
}
}
struct PageserverTask {
ps_id: NodeId,
advs: spsc_watch::Receiver<Advs>,
}
impl PageserverTask {
/// Cancellation: happens through last PageserverHandle being dropped.
async fn run(mut self) {
loop {
let Ok(advs) = self.advs.recv().await else {
info!("main task gone, exiting");
return;
};
let res = self.run0(advs).await;
match res {
Ok(()) => {}
Err(err) => {
error!(?err, "error sending advertisements");
// TODO: proper backoff?
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
async fn run0(&mut self, advs: HashMap<TenantTimelineId, Lsn>) -> anyhow::Result<()> {
use storage_broker::wal_advertisement as proto;
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
let stream = async_stream::stream! {
for (tenant_timeline_id, commit_lsn) in advs {
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId {
tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(),
timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(),
}), commit_lsn: commit_lsn.0 };
}
};
let mut client: PageserverClient<_> = PageserverClient::connect(self.endpoint.clone())
.await
.context("connect")?;
let publish_stream = client
.publish_commit_lsn_advertisements(stream)
.await
.context("publish stream")?;
Ok(())
}
}

View File

@@ -27,6 +27,7 @@ parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true

View File

@@ -5,7 +5,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// easy location, but apparently interference with cachepot sometimes fails
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/broker.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
let protos = [
"proto/broker.proto",
"proto/wal_advertisement.proto",
];
for proto in protos {
tonic_build::compile_protos(proto)
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
}
Ok(())
}

View File

@@ -35,14 +35,14 @@ message SafekeeperTimelineInfo {
// LSN of the last record.
uint64 flush_lsn = 4;
// Up to which LSN safekeeper regards its WAL as committed.
uint64 commit_lsn = 5;
uint64 commit_lsn = 5; // yes
// LSN up to which safekeeper has backed WAL.
uint64 backup_lsn = 6;
// LSN of last checkpoint uploaded by pageserver.
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
uint64 standby_horizon = 14;
uint64 standby_horizon = 14; // yes
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string.

View File

@@ -0,0 +1,29 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package wal_advertisement;
service Pageserver {
rpc PublishCommitLsnAdvertisements(stream CommitLsnAdvertisement) returns (google.protobuf.Empty) {};
rpc SubscribeRemoteConsistentLsnAdvertisements(google.protobuf.Empty) returns (stream RemoteConsistentLsnAdvertisement) {};
}
message CommitLsnAdvertisement {
TenantTimelineId tenant_timeline_id = 1;
uint64 commit_lsn = 2;
}
message RemoteConsistentLsnAdvertisement {
bytes tenant_id = 1;
uint32 shard_id = 2;
bytes timeline_id = 3;
uint64 generation = 4;
uint64 remote_consistent_lsn = 5;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}

View File

@@ -1,10 +1,14 @@
use std::time::Duration;
use futures::Stream;
use proto::TenantTimelineId as ProtoTenantTimelineId;
use proto::broker_service_client::BrokerServiceClient;
use tokio_util::sync::CancellationToken;
use tonic::Status;
use tonic::codegen::StdError;
use tonic::transport::{Channel, Endpoint};
use tonic::transport::Endpoint;
use tracing::{debug, error, info, warn};
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
// Code generated by protobuf.
@@ -16,12 +20,18 @@ pub mod proto {
tonic::include_proto!("storage_broker");
}
pub mod wal_advertisement {
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("wal_advertisement");
}
pub mod metrics;
// Re-exports to avoid direct tonic dependency in user crates.
pub use hyper::Uri;
pub use tonic::transport::{Certificate, ClientTlsConfig};
pub use tonic::{Code, Request, Streaming};
use utils::shard::TenantShardId;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
@@ -29,9 +39,199 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
#[derive(Clone)]
pub struct TimelineUpdatesSubscriber {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code.
pub struct BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
impl BrokerClientChannel {
pub fn into_raw_grpc_client(
self,
) -> proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel> {
self.client
}
}
pub struct TimelineShardUpdate {
pub is_discovery: bool,
pub inner: proto::SafekeeperDiscoveryResponse,
}
pub struct DiscoveryRequester {
id: ProtoTenantTimelineId,
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
impl TimelineUpdatesSubscriber {
pub fn new(service_client: BrokerClientChannel) -> Self {
Self {
client: service_client.client.clone(),
}
}
pub fn subscribe(
&mut self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
cancel: &CancellationToken,
) -> (impl Stream<Item = TimelineShardUpdate>, DiscoveryRequester) {
let id = ProtoTenantTimelineId {
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
timeline_id: timeline_id.as_ref().to_owned(),
};
let discovery_requester = DiscoveryRequester {
id: id.clone(),
client: self.client.clone(),
};
let stream = async_stream::stream! {
let mut attempt = 0;
'resubscribe: loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
use proto::*;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(id.clone()),
}),
};
let res = tokio::select! {
r = self.client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return; }
};
let mut update_stream = match res
{
Ok(resp) => {
resp.into_inner()
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
attempt, "failed to subscribe: {e:#}"
);
continue 'resubscribe;
}
};
loop {
let broker_update = tokio::select!{
_ = cancel.cancelled() => {
return;
}
update = update_stream.message() => { update }
};
match broker_update {
Ok(Some(typed_msg)) => {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
continue 'resubscribe;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
continue 'resubscribe;
}
}
}
_ => {
// unexpected message
warn!("unexpected message from broker: {typed_msg:?}");
continue 'resubscribe;
}
};
attempt = 0; // reset backoff iff we received a valid update
yield TimelineShardUpdate{is_discovery, inner: timeline_update };
},
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
continue 'resubscribe;
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
continue 'resubscribe;
}
}
}
}
};
(stream, discovery_requester)
}
}
impl DiscoveryRequester {
pub async fn request(&mut self) {
let request = proto::SafekeeperDiscoveryRequest {
tenant_timeline_id: Some(self.id.clone()),
};
let msg = proto::TypedMessage {
r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = self.client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
}
}
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
@@ -67,19 +267,9 @@ where
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
// keep_alive_timeout is 20s by default on both client and server side
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}
impl BrokerClientChannel {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
Ok(Self::new(conn))
}
Ok(BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient::new(channel),
})
}
// parse variable length bytes from protobuf

View File

@@ -15,6 +15,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
@@ -69,4 +70,4 @@ http-utils = { path = "../libs/http-utils/" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -0,0 +1,16 @@
DROP TRIGGER on_timelines_UPDATE_enqueue_sk_ps_discovery on "timelines";
DROP FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_timelines_DELETE_enqueue_sk_ps_discovery on "timelines";
DROP FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_timelines_INSERT_enqueue_sk_ps_discovery on "timelines";
DROP FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn;
DROP TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery on "tenant_shards";
DROP FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn;
DROP FUNCTION IF EXISTS sk_ps_discovery_enqueue_attachment_create;
DROP TABLE "sk_ps_discovery";

View File

@@ -0,0 +1,122 @@
CREATE TABLE "sk_ps_discovery"(
"tenant_id" VARCHAR NOT NULL,
"shard_number" INT4 NOT NULL,
"shard_count" INT4 NOT NULL,
"ps_generation" INT4 NOT NULL,
"sk_id" INT8 NOT NULL REFERENCES "safekeepers"("id") ON DELETE CASCADE, -- more efficient that trigger on "safekeepers"
"intent_state" VARCHAR NOT NULL, -- attached,detached
"ps_id" INT8 NOT NULL REFERENCES "nodes"("node_id") ON DELETE CASCADE, -- more efficient that trigger on "nodes"
"created_at" TIMESTAMPTZ NOT NULL,
"retries" INT4 NOT NULL DEFAULT 0,
"last_retry_at" TIMESTAMPTZ,
"acknowledged_at" TIMESTAMPTZ,
PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id")
);
CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_attachment_create(ARG_TENANT_ID VARCHAR)
RETURNS VOID AS $$
BEGIN
INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, intent_state, ps_id, created_at)
WITH intent_attachments AS (
SELECT DISTINCT tenant_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines
WHERE
tenant_id = ARG_TENANT_ID
AND
timelines.deleted_at IS NULL
)
SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count,
tenant_shards.generation, intent_attachments.sk_id, 'attached', tenant_shards.generation_pageserver, NOW()
FROM tenant_shards
INNER JOIN intent_attachments ON tenant_shards.tenant_id = intent_attachments.tenant_id
ON CONFLICT DO NOTHING; -- the first trigger creates the attachment, all others are identical because tenant shard generations are monotonic
PERFORM pg_notify('sk_ps_discovery', json_build_object(
'tenant_id', ARG_TENANT_ID
)::text);
END;
$$ LANGUAGE plpgsql;
-- Trigger on tenant_shards table
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery
AFTER INSERT
ON "tenant_shards"
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id);
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery
AFTER DELETE
ON "tenant_shards"
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery
AFTER UPDATE
ON "tenant_shards"
FOR EACH ROW
EXECUTE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn();
-- Trigger on timelines table
CREATE OR REPLACE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_timelines_INSERT_enqueue_sk_ps_discovery
AFTER INSERT
ON "timelines"
FOR EACH ROW
EXECUTE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id);
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_timelines_DELETE_enqueue_sk_ps_discovery
AFTER DELETE
ON "timelines"
FOR EACH ROW
EXECUTE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn();
CREATE OR REPLACE FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn()
RETURNS TRIGGER AS $$
BEGIN
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER on_timelines_UPDATE_enqueue_sk_ps_discovery
AFTER UPDATE
ON "timelines"
FOR EACH ROW
EXECUTE FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn();

View File

@@ -436,7 +436,8 @@ async fn async_main() -> anyhow::Result<()> {
};
// Validate that we can connect to the database
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into())
.await?;
let persistence = Arc::new(Persistence::new(secrets.database_url).await);

View File

@@ -1,6 +1,8 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::io::Write;
use std::ops::Add;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -15,8 +17,8 @@ use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
@@ -31,6 +33,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
use rustls::crypto::ring;
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -74,6 +77,8 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: Pool<AsyncPgConnection>,
connect_tokio_postgres:
Box<dyn Sync + Send + 'static + Fn() -> BoxFuture<'static, TokioPostgresConnectResult>>,
}
/// Legacy format, for use in JSON compat objects in test environment
@@ -135,6 +140,8 @@ pub(crate) enum DatabaseOperation {
DeleteTimelineImport,
ListTimelineImports,
IsTenantImportingTimeline,
ListSkPsDiscovery,
UpdateSkPsDiscoveryAttempt,
}
#[must_use]
@@ -177,10 +184,11 @@ impl Persistence {
pub async fn new(database_url: String) -> Self {
let mut mgr_config = ManagerConfig::default();
mgr_config.custom_setup = Box::new(establish_connection_rustls);
mgr_config.custom_setup =
Box::new(|config| establish_connection_rustls_diesel(config.to_owned()));
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
database_url,
database_url.clone(),
mgr_config,
);
@@ -197,20 +205,25 @@ impl Persistence {
.await
.expect("Could not build connection pool");
Self { connection_pool }
Self {
connection_pool,
connect_tokio_postgres: Box::new(move || {
establish_connection_rustls_tokio_postgres(database_url.clone())
}),
}
}
/// A helper for use during startup, where we would like to tolerate concurrent restarts of the
/// database and the storage controller, therefore the database might not be available right away
pub async fn await_connection(
database_url: &str,
database_url: String,
timeout: Duration,
) -> Result<(), diesel::ConnectionError> {
let started_at = Instant::now();
log_postgres_connstr_info(database_url)
log_postgres_connstr_info(&database_url)
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
loop {
match establish_connection_rustls(database_url).await {
match establish_connection_rustls_diesel(database_url.clone()).await {
Ok(_) => {
tracing::info!("Connected to database.");
return Ok(());
@@ -1821,6 +1834,151 @@ impl Persistence {
})
.await
}
pub(crate) async fn listen_sk_ps_discovery(
&self,
) -> DatabaseResult<
Pin<Box<dyn Send + 'static + futures::Stream<Item = Result<TenantId, serde_json::Error>>>>,
> {
let (client, mut conn) = (&self.connect_tokio_postgres)().await?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut stream = futures::stream::poll_fn(move |cx| conn.poll_message(cx));
while let Some(msg) = stream.next().await {
info!(?msg, "async message");
match msg {
Ok(tokio_postgres::AsyncMessage::Notification(notification))
if notification.channel() == "sk_ps_discovery" =>
{
let Ok(()) = tx.send(notification).await else {
tracing::info!(
"sk_ps_discovery notification rx dropped, stopping async notification processing"
);
break;
};
}
Ok(_) => {}
Err(err) => {
tracing::error!(?err, "tokio_postgres poll_message error");
break;
}
}
}
tracing::info!("sk_ps_discovery notification stream returned None, exiting");
});
client
.batch_execute("LISTEN sk_ps_discovery;")
.await
.expect("TODO");
#[derive(serde::Deserialize)]
struct Notification {
tenant_id: TenantId,
}
Ok(Box::pin(async_stream::stream! {
while let Some(msg) = rx.recv().await {
let msg: Result<Notification, _> = serde_json::from_str(msg.payload());
let msg = msg.map(|Notification { tenant_id }| tenant_id );
yield msg;
}
tracing::info!("sk_ps_discovery channel closed, stopping stream");
// keep client alive inside the returned sream object, othrwise `conn` ends as soon as we return from this function
drop(client);
}))
}
pub(crate) async fn get_all_sk_ps_discovery_work(
&self,
) -> DatabaseResult<Vec<SkPsDiscoveryPersistence>> {
use crate::schema::sk_ps_discovery::dsl;
self.with_measured_conn(DatabaseOperation::ListSkPsDiscovery, move |conn| {
Box::pin(async move {
let vec: Vec<SkPsDiscoveryPersistence> = dsl::sk_ps_discovery.load(conn).await?;
Ok(vec)
})
})
.await
}
pub(crate) async fn update_sk_ps_discovery_attempt(
&self,
pk: SkPsDiscoveryPersistencePk,
intent_state: String,
update: Result<(), ()>,
) -> DatabaseResult<()> {
use crate::schema::sk_ps_discovery::dsl;
self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| {
let pk = pk.clone();
let intent_state = intent_state.clone();
Box::pin(async move {
match update {
Ok(()) => {
let SkPsDiscoveryPersistencePk {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
} = pk;
let nrows = diesel::delete(dsl::sk_ps_discovery)
// primary key
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
// intent_state could have changed beneath us (split brain or concurrent state gc)
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
.filter(dsl::intent_state.eq(intent_state))
.execute(conn)
.await?;
if nrows != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of deletes: {nrows}"
)));
}
}
Err(_) => {
let SkPsDiscoveryPersistencePk {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
} = pk;
let nrows = diesel::update(dsl::sk_ps_discovery)
// primary key
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
// intent_state could have changed beneath us (split brain or concurrent state gc)
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
.filter(dsl::intent_state.eq(intent_state))
// action:
.set((
dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice...
dsl::last_retry_at.eq(diesel::dsl::now),
))
.execute(conn) // TODO: check update count?
.await?;
if nrows != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of updates: {nrows}"
)));
}
}
}
Ok(())
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -1909,21 +2067,40 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
})
}
fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
let fut = async {
type TokioPostgresConnectResult = ConnectionResult<(
tokio_postgres::Client,
tokio_postgres::Connection<
tokio_postgres::Socket,
tokio_postgres_rustls::RustlsStream<tokio_postgres::Socket>,
>,
)>;
fn establish_connection_rustls_tokio_postgres(
config: String,
) -> BoxFuture<'static, TokioPostgresConnectResult> {
let fut = async move {
// We first set up the way we want rustls to work.
let rustls_config = client_config_with_root_certs()
.map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?;
let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config);
let (client, conn) = tokio_postgres::connect(config, tls)
let (client, conn) = tokio_postgres::connect(&config, tls)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
AsyncPgConnection::try_from_client_and_connection(client, conn).await
Ok((client, conn))
};
fut.boxed()
}
fn establish_connection_rustls_diesel(
config: String,
) -> BoxFuture<'static, ConnectionResult<AsyncPgConnection>> {
async {
let (client, conn) = establish_connection_rustls_tokio_postgres(config).await?;
AsyncPgConnection::try_from_client_and_connection(client, conn).await
}
.boxed()
}
#[cfg_attr(test, test)]
fn test_config_debug_censors_password() {
let has_pw =
@@ -2386,3 +2563,61 @@ pub(crate) struct TimelineImportPersistence {
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}
#[derive(Insertable, AsChangeset, Selectable, Clone, PartialEq, Eq, Hash, Debug)]
#[diesel(table_name = crate::schema::sk_ps_discovery)]
pub(crate) struct SkPsDiscoveryPersistencePk {
pub(crate) tenant_id: String,
pub(crate) shard_number: i32,
pub(crate) shard_count: i32,
pub(crate) ps_generation: i32,
pub(crate) sk_id: i64,
}
#[derive(Queryable, Selectable, Clone, PartialEq, Eq)]
#[diesel(table_name = crate::schema::sk_ps_discovery)]
pub(crate) struct SkPsDiscoveryPersistence {
pub(crate) tenant_id: String,
pub(crate) shard_number: i32,
pub(crate) shard_count: i32,
pub(crate) ps_generation: i32,
pub(crate) sk_id: i64,
pub(crate) intent_state: String,
pub(crate) ps_id: i64,
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
pub(crate) retries: i32,
pub(crate) last_retry_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) acknowledged_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl SkPsDiscoveryPersistence {
pub(crate) fn tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
pub(crate) fn primary_key(&self) -> SkPsDiscoveryPersistencePk {
let SkPsDiscoveryPersistence {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
intent_state: _,
ps_id: _,
created_at: _,
retries: _,
last_retry_at: _,
acknowledged_at: _,
} = self;
SkPsDiscoveryPersistencePk {
tenant_id: tenant_id.clone(),
shard_number: *shard_number,
shard_count: *shard_count,
ps_generation: *ps_generation,
sk_id: *sk_id,
}
}
}

View File

@@ -1,9 +1,11 @@
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
TenantShardPageserverAttachmentChange, TimelineCreateRequest,
};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::shard::TenantShardId;
use crate::metrics::PageserverRequestLabelGroup;
@@ -164,4 +166,19 @@ impl SafekeeperClient {
self.inner.utilization().await
)
}
pub async fn post_tenant_shard_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
body: TenantShardPageserverAttachmentChange,
) -> Result<()> {
measured_request!(
"post_tenant_shard_pageserver_attachments",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
.await
)
}
}

View File

@@ -60,6 +60,22 @@ diesel::table! {
}
}
diesel::table! {
sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
ps_generation -> Int4,
sk_id -> Int8,
intent_state -> Varchar,
ps_id -> Int8,
created_at -> Timestamptz,
retries -> Int4,
last_retry_at -> Nullable<Timestamptz>,
acknowledged_at -> Nullable<Timestamptz>,
}
}
diesel::table! {
tenant_shards (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
@@ -100,12 +116,16 @@ diesel::table! {
}
}
diesel::joinable!(sk_ps_discovery -> nodes (ps_id));
diesel::joinable!(sk_ps_discovery -> safekeepers (sk_id));
diesel::allow_tables_to_appear_in_same_query!(
controllers,
metadata_health,
nodes,
safekeeper_timeline_pending_ops,
safekeepers,
sk_ps_discovery,
tenant_shards,
timeline_imports,
timelines,

View File

@@ -2,6 +2,7 @@ pub mod chaos_injector;
mod context_iterator;
pub(crate) mod safekeeper_reconciler;
mod safekeeper_service;
mod sk_ps_discovery;
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -1192,6 +1193,16 @@ impl Service {
}
}
}
#[instrument(skip_all)]
async fn run_sk_ps_discovery(self: &Arc<Self>) {
self.startup_complete.clone().wait().await;
sk_ps_discovery::run(
self.clone(),
self.http_client.clone(), /* TODO this client is configured to openf resh TCP connection each time, very inefficient */
).await;
}
/// Heartbeat all storage nodes once in a while.
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
@@ -1797,7 +1808,7 @@ impl Service {
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
http_client: http_client.clone(),
step_down_barrier: Default::default(),
});
@@ -1865,6 +1876,15 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.run_sk_ps_discovery().await
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();

View File

@@ -647,6 +647,11 @@ impl Service {
sk.describe_response()
}
pub(crate) fn get_safekeeper_object(&self, node_id: i64) -> Option<Safekeeper> {
let locked = self.inner.read().unwrap();
locked.safekeepers.get(&NodeId(node_id as u64)).cloned()
}
pub(crate) async fn upsert_safekeeper(
self: &Arc<Service>,
record: crate::persistence::SafekeeperUpsert,

View File

@@ -0,0 +1,266 @@
use std::{
collections::{HashMap, hash_map},
sync::Arc,
time::Duration,
};
use anyhow::Context;
use futures::{StreamExt, stream::FuturesUnordered};
use safekeeper_api::models::{
TenantShardPageserverAttachment, TenantShardPageserverAttachmentChange,
};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span, error, info, info_span};
use utils::{
generation::Generation,
id::{NodeId, TenantId},
logging::SecretString,
shard::ShardIndex,
};
use crate::{
heartbeater::SafekeeperState,
persistence::{Persistence, SkPsDiscoveryPersistence},
};
use super::Service;
struct Actor {
service: Arc<Service>,
persistence: Arc<Persistence>,
http_client: reqwest::Client,
}
pub async fn run(service: Arc<Service>, http_client: reqwest::Client) {
let actor = Actor {
persistence: service.persistence.clone(),
service,
http_client, // XXX: build our own client instead of getting Service's client; we probably want idle conn to each sk
};
actor.run().await;
}
impl Actor {
async fn run(mut self) {
loop {
match self.run0().await {
Ok(()) => {
info!("sk_ps_discovery actor exiting after shutdown signal observed");
return;
}
Err(err) => {
tracing::error!(
?err,
"sk_ps_discovery actor encountered an error, restarting after backoff"
);
// TODO: proper backoff
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
}
async fn run0(&mut self) -> anyhow::Result<()> {
let mut subscription = self
.persistence
.listen_sk_ps_discovery()
.await
.context("listen to sk_ps_discovery")?;
let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5));
struct Task {
work: SkPsDiscoveryPersistence,
cancel: CancellationToken,
join_handle: Option<JoinHandle<()>>,
}
let mut tasks = HashMap::new();
loop {
tokio::select! {
biased; // control messages have higher priority, the periodic full tick, then subscriptions.
_ = sync_full_ticker.tick() => {
info!("rebuild");
}
maybe_res = subscription.next() => {
match maybe_res {
None => {
anyhow::bail!("subscription should never end");
}
Some(Ok(tenant_id)) => {
let tenant_id: TenantId = tenant_id;
info!(?tenant_id, "notify for tenant_id");
// for now, just also rebuild everything
}
Some(Err(err)) => {
let err: serde_json::Error = err;
anyhow::bail!("incorrect notification format: {err:?}"); // FIXME repeat message in error so it can be debugged ?
}
}
}
}
// get list of tasks from database
let mut new_tasks = self
.persistence
.get_all_sk_ps_discovery_work()
.await
.context("get_all_sk_ps_discovery_work")?
.into_iter()
.map(|work: SkPsDiscoveryPersistence| {
(
work.primary_key(),
Task {
work,
cancel: CancellationToken::new(),
join_handle: None,
},
)
})
.collect::<HashMap<_, _>>();
// Carry over ongoing tasks
let mut cancelled_wait = FuturesUnordered::new();
for (
task_key,
Task {
work: ongoing_persistence,
cancel,
join_handle,
},
) in tasks.drain()
{
match new_tasks.entry(task_key) {
hash_map::Entry::Occupied(mut planned) => {
let Task {
work: planned_persistence,
cancel: planned_cancel,
join_handle: planned_jh,
} = planned.get_mut();
assert!(planned_jh.is_none());
if *planned_persistence == ongoing_persistence {
*planned_jh = join_handle;
*planned_cancel = cancel;
continue;
}
}
hash_map::Entry::Vacant(_) => (),
}
cancel.cancel();
cancelled_wait.push(async move {
if let Some(jh) = join_handle {
let _ = jh.await;
}
});
}
while let Some(_) = cancelled_wait.next().await {}
tasks = new_tasks;
// Kick off new tasks
for (key, task) in tasks.iter_mut() {
if task.join_handle.is_none() {
task.join_handle = Some(tokio::spawn(
DeliveryAttempt {
cancel: task.cancel.clone(),
persistence: self.persistence.clone(),
service: self.service.clone(),
http_client: self.http_client.clone(),
work: task.work.clone(),
}
.run()
.instrument({
let span = info_span!(parent: None, "sk_ps_discovery_delivery", ?key);
span.follows_from(Span::current());
span
}),
))
}
}
}
}
}
struct DeliveryAttempt {
cancel: CancellationToken,
persistence: Arc<Persistence>,
service: Arc<super::Service>,
http_client: reqwest::Client,
work: SkPsDiscoveryPersistence,
}
impl DeliveryAttempt {
pub async fn run(self) {
let res = self.run0().await;
if self.cancel.is_cancelled() {
return;
}
if let Err(ref err) = res {
error!(?err, "attempt failed");
}
let res = self
.persistence
.update_sk_ps_discovery_attempt(
self.work.primary_key(),
self.work.intent_state.clone(),
res.map_err(|_| ()),
)
.await;
if let Err(ref err) = res {
error!(?err, "persistence of attempt result failed");
}
}
async fn run0(&self) -> anyhow::Result<()> {
let Some(sk) = self.service.get_safekeeper_object(self.work.sk_id) else {
anyhow::bail!("safekeeper object does not exist");
};
match sk.availability() {
SafekeeperState::Available { .. } => (),
SafekeeperState::Offline => {
anyhow::bail!("safekeeper is offline");
}
}
let body = {
let val = TenantShardPageserverAttachment {
shard_id: ShardIndex {
shard_number: utils::shard::ShardNumber(self.work.shard_number as u8),
shard_count: utils::shard::ShardCount(self.work.shard_count as u8),
},
ps_id: NodeId(self.work.ps_id as u64),
generation: Generation::new(self.work.ps_generation as u32),
};
match self.work.intent_state.as_str() {
"attached" => TenantShardPageserverAttachmentChange::Attach { field1: val },
"detached" => TenantShardPageserverAttachmentChange::Detach(val),
x => anyhow::bail!("unknown intent state {x:?}"),
}
};
let tenant_shard_id = self.work.tenant_shard_id()?;
sk.with_client_retries(
|client| {
let body = body.clone();
async move {
client
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
.await
}
},
&self.http_client,
&self
.service
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from),
1,
3,
Duration::from_secs(1),
&self.cancel,
)
.await?;
Ok(())
}
}