mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Compare commits
42 Commits
quantumish
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a80790e0c4 | ||
|
|
906a963351 | ||
|
|
9e7556bef2 | ||
|
|
4f4214eea3 | ||
|
|
3dffbda428 | ||
|
|
04fb256e0f | ||
|
|
02fe35831c | ||
|
|
5f7bc3ce60 | ||
|
|
e40b1c79fa | ||
|
|
2f416267dc | ||
|
|
d52d560f16 | ||
|
|
de908a7b0d | ||
|
|
687fb25d41 | ||
|
|
eaa91291ae | ||
|
|
fc9f38dd2d | ||
|
|
c689110ad6 | ||
|
|
ba6abe203d | ||
|
|
d16e024d49 | ||
|
|
a4b9335b73 | ||
|
|
2cea7a7838 | ||
|
|
e1c1aa74fe | ||
|
|
ee775a24a0 | ||
|
|
428f532f08 | ||
|
|
5efb0d8072 | ||
|
|
36ba2b8e44 | ||
|
|
1de0f41403 | ||
|
|
4d2f27a33f | ||
|
|
88a3c9e7fd | ||
|
|
df36b9aa62 | ||
|
|
18a43eeab3 | ||
|
|
39039d1be7 | ||
|
|
9ee75ceee6 | ||
|
|
f5210a367d | ||
|
|
f36520eb94 | ||
|
|
afa35eea87 | ||
|
|
8eb853b731 | ||
|
|
a95015d967 | ||
|
|
3836ee8539 | ||
|
|
a6bd4a3be6 | ||
|
|
24d96e4372 | ||
|
|
29ea89b61d | ||
|
|
322e742e4c |
75
Cargo.lock
generated
75
Cargo.lock
generated
@@ -6064,8 +6064,10 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sk_ps_discovery",
|
||||
"smallvec",
|
||||
"storage_broker",
|
||||
"storage_controller_client",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
@@ -6077,6 +6079,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
@@ -6572,6 +6575,76 @@ version = "0.3.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
|
||||
|
||||
[[package]]
|
||||
name = "sk_ps_discovery"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"desim",
|
||||
"env_logger",
|
||||
"fail",
|
||||
"futures",
|
||||
"hex",
|
||||
"http 1.1.0",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"pem",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rustls 0.23.18",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scopeguard",
|
||||
"sd-notify",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"storage_broker",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.8"
|
||||
@@ -6678,6 +6751,7 @@ dependencies = [
|
||||
"rustls 0.23.18",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
@@ -6690,6 +6764,7 @@ name = "storage_controller"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
|
||||
@@ -43,7 +43,7 @@ members = [
|
||||
"libs/proxy/postgres-protocol2",
|
||||
"libs/proxy/postgres-types2",
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"endpoint_storage",
|
||||
"endpoint_storage", "libs/sk_ps_discovery",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -262,6 +262,7 @@ pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
|
||||
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
sk_ps_discovery = { path = "./libs/sk_ps_discovery" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
|
||||
storage_controller_client = { path = "./storage_controller/client" }
|
||||
|
||||
@@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use crate::membership::Configuration;
|
||||
use crate::{ServerInfo, Term};
|
||||
@@ -309,3 +311,29 @@ pub struct PullTimelineResponse {
|
||||
pub safekeeper_host: Option<String>,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "action")]
|
||||
pub enum TenantShardPageserverAttachmentChange {
|
||||
Attach(TenantShardPageserverAttachment),
|
||||
Detach(TenantShardPageserverAttachment),
|
||||
}
|
||||
|
||||
impl TenantShardPageserverAttachmentChange {
|
||||
pub fn attachment(&self) -> &TenantShardPageserverAttachment {
|
||||
match self {
|
||||
TenantShardPageserverAttachmentChange::Attach(a) => a,
|
||||
TenantShardPageserverAttachmentChange::Detach(a) => a,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TenantShardPageserverAttachment {
|
||||
pub shard_id: ShardIndex,
|
||||
pub generation: Generation,
|
||||
pub ps_id: NodeId,
|
||||
// TODO: avoid transmitting this with every request.
|
||||
// How nice things could be if there were simple DNS records for ps-$node_id.$cell.$region.$cloud.neon.tech
|
||||
pub ps_hostname: String, // TODO: some type safety
|
||||
}
|
||||
|
||||
81
libs/sk_ps_discovery/Cargo.toml
Normal file
81
libs/sk_ps_discovery/Cargo.toml
Normal file
@@ -0,0 +1,81 @@
|
||||
[package]
|
||||
name = "sk_ps_discovery"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
workspace_hack.workspace = true
|
||||
|
||||
async-stream.workspace = true
|
||||
anyhow.workspace = true
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
chrono.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
crc32c.workspace = true
|
||||
fail.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
http.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
smallvec.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
sha2.workspace = true
|
||||
sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
env_logger.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
itertools.workspace = true
|
||||
walproposer.workspace = true
|
||||
rand.workspace = true
|
||||
desim.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
|
||||
[[bench]]
|
||||
name = "bench"
|
||||
harness = false
|
||||
97
libs/sk_ps_discovery/benches/bench.rs
Normal file
97
libs/sk_ps_discovery/benches/bench.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
//! WAL ingestion benchmarks.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use sk_ps_discovery::{
|
||||
AttachmentUpdate, RemoteConsistentLsnAdv, TenantShardAttachmentId, TimelineAttachmentId,
|
||||
};
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
shard::ShardIndex,
|
||||
};
|
||||
|
||||
/// Use jemalloc and enable profiling, to mirror bin/safekeeper.rs.
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[unsafe(export_name = "malloc_conf")]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_simple,
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
fn bench_simple(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("simple");
|
||||
|
||||
// setup
|
||||
let mut world = sk_ps_discovery::World::default();
|
||||
|
||||
// Simplified view: lots of unsharded tenants with one timeline each
|
||||
let n_pageservers = 20;
|
||||
let n_tenant_shards_per_pageserver = 2000;
|
||||
for ps_id in 1..=n_pageservers {
|
||||
for _ in ..n_tenant_shards_per_pageserver {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
for generation in 10..=11 {
|
||||
let tenant_shard_attachment_id = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(generation),
|
||||
};
|
||||
let timeline_attachment = TimelineAttachmentId {
|
||||
tenant_timeline_id: TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(generation),
|
||||
};
|
||||
world.update_attachment(AttachmentUpdate {
|
||||
tenant_shard_attachment_id,
|
||||
action: sk_ps_discovery::AttachmentUpdateAction::Attach {
|
||||
ps_id: NodeId(ps_id),
|
||||
},
|
||||
});
|
||||
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
|
||||
remote_consistent_lsn: Lsn(23),
|
||||
attachment: timeline_attachment,
|
||||
});
|
||||
}
|
||||
world.handle_commit_lsn_advancement(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
Lsn(42),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// setup done
|
||||
let world = world;
|
||||
g.bench_function("get_commit_lsn_advertisements", |bencher| {
|
||||
bencher.iter_custom(|iters| {
|
||||
let started = Instant::now();
|
||||
|
||||
for _ in 0..iters {
|
||||
criterion::black_box(world.get_commit_lsn_advertisements());
|
||||
}
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
g.finish();
|
||||
}
|
||||
515
libs/sk_ps_discovery/src/lib.rs
Normal file
515
libs/sk_ps_discovery/src/lib.rs
Normal file
@@ -0,0 +1,515 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
|
||||
ops::RangeInclusive,
|
||||
};
|
||||
|
||||
use tracing::{info, warn};
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
merge_join,
|
||||
shard::ShardIndex,
|
||||
};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct World {
|
||||
attachments: BTreeMap<TenantShardAttachmentId, NodeId>,
|
||||
attachment_count: HashMap<TenantId, u16>,
|
||||
nodes_timelines: HashMap<NodeId, HashMap<TenantTimelineId, u16>>, // u16 is a refcount from each timeline attachment id
|
||||
// continously maintained aggregate for efficient decisionmaking on quiescing;
|
||||
// quiesced timelines are always caught up
|
||||
// can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn)
|
||||
caught_up_count: HashMap<TenantTimelineId, u16>,
|
||||
|
||||
// BEGIN quiescing/active split
|
||||
quiesced_timelines: BTreeMap<TenantTimelineId, Lsn>,
|
||||
// ^
|
||||
// either a timeline is in quiesced_timelines
|
||||
// or it is below
|
||||
// v
|
||||
commit_lsns: BTreeMap<TenantTimelineId, Lsn>,
|
||||
remote_consistent_lsns: BTreeMap<TimelineAttachmentId, Lsn>,
|
||||
// END quiescing/active split
|
||||
|
||||
// other fields
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
pub struct TenantShardAttachmentId {
|
||||
pub tenant_id: TenantId,
|
||||
pub shard_id: ShardIndex,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
pub struct TimelineAttachmentId {
|
||||
pub tenant_timeline_id: TenantTimelineId,
|
||||
pub shard_id: ShardIndex,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
pub struct AttachmentUpdate {
|
||||
pub tenant_shard_attachment_id: TenantShardAttachmentId,
|
||||
pub action: AttachmentUpdateAction,
|
||||
}
|
||||
|
||||
pub enum AttachmentUpdateAction {
|
||||
Attach { ps_id: NodeId },
|
||||
Detach,
|
||||
}
|
||||
|
||||
pub struct RemoteConsistentLsnAdv {
|
||||
pub attachment: TimelineAttachmentId,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl World {
|
||||
fn check_invariants(&self) {
|
||||
if !cfg!(debug_assertions) {
|
||||
return;
|
||||
}
|
||||
|
||||
// caught_up_count maintenance
|
||||
{
|
||||
for (tenant_timeline_id, caught_up_count) in
|
||||
self.caught_up_count.iter().map(|(k, v)| (*k, *v))
|
||||
{
|
||||
let attachment_count = *self
|
||||
.attachment_count
|
||||
.get(&tenant_timeline_id.tenant_id)
|
||||
.unwrap();
|
||||
assert!(caught_up_count <= attachment_count);
|
||||
if caught_up_count == attachment_count {
|
||||
self.quiesced_timelines.contains_key(&tenant_timeline_id);
|
||||
// remote_consistent_lsn and commit_lsns is empty, checked by "quiescing XOR ..." below
|
||||
} else {
|
||||
let commit_lsn = self.commit_lsns[&&tenant_timeline_id];
|
||||
let mut validate_caught_up = 0;
|
||||
let mut validate_not_caught_up = 0;
|
||||
for (_, r_c_lsn) in self
|
||||
.remote_consistent_lsns
|
||||
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
|
||||
.map(|(k, v)| (*k, *v))
|
||||
{
|
||||
if r_c_lsn == commit_lsn {
|
||||
validate_caught_up += 1;
|
||||
} else {
|
||||
assert!(r_c_lsn < commit_lsn);
|
||||
validate_not_caught_up += 1;
|
||||
}
|
||||
}
|
||||
assert_eq!(validate_caught_up, caught_up_count);
|
||||
assert_eq!(
|
||||
validate_caught_up + validate_not_caught_up,
|
||||
attachment_count
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// quiescing XOR ...
|
||||
{
|
||||
let quiesced_timelines: HashSet<TenantTimelineId> =
|
||||
self.quiesced_timelines.keys().cloned().collect();
|
||||
let commit_lsn_timelines: HashSet<TenantTimelineId> =
|
||||
self.commit_lsns.keys().cloned().collect();
|
||||
let remote_consistent_lsn_timelines: HashSet<TenantTimelineId> = self
|
||||
.remote_consistent_lsns
|
||||
.keys()
|
||||
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id)
|
||||
.collect();
|
||||
#[rustfmt::skip]
|
||||
assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count());
|
||||
#[rustfmt::skip]
|
||||
assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count());
|
||||
}
|
||||
|
||||
// nodes_timelines maintenance
|
||||
{
|
||||
let mut expect: HashMap<NodeId, HashMap<TenantTimelineId, u16>> = HashMap::new();
|
||||
let all_ttids: BTreeSet<TenantTimelineId> = self
|
||||
.quiesced_timelines
|
||||
.keys()
|
||||
.cloned()
|
||||
.chain(
|
||||
self.remote_consistent_lsns
|
||||
.keys()
|
||||
.cloned()
|
||||
.map(|tlaid| tlaid.tenant_timeline_id),
|
||||
)
|
||||
.collect();
|
||||
for ttid in all_ttids {
|
||||
for (_, node_id) in self
|
||||
.attachments
|
||||
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
|
||||
.map(|(k, v)| (*k, *v))
|
||||
{
|
||||
let expect = expect.entry(node_id).or_default();
|
||||
let refcount = expect.entry(ttid).or_default();
|
||||
*refcount += 1;
|
||||
}
|
||||
}
|
||||
assert_eq!(expect, self.nodes_timelines);
|
||||
}
|
||||
}
|
||||
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
|
||||
self.check_invariants();
|
||||
use AttachmentUpdateAction::*;
|
||||
use btree_map::Entry::*;
|
||||
let AttachmentUpdate {
|
||||
tenant_shard_attachment_id,
|
||||
action,
|
||||
} = upd;
|
||||
match (action, self.attachments.entry(tenant_shard_attachment_id)) {
|
||||
(Attach { ps_id }, Occupied(e)) if *e.get() == ps_id => {
|
||||
info!("attachment is already known")
|
||||
}
|
||||
(Attach { ps_id }, Occupied(e)) => {
|
||||
warn!(current_node=%e.get(), proposed_node=%ps_id, "ignoring update that moves attachment to a different pageserver");
|
||||
}
|
||||
(Attach { ps_id }, Vacant(e)) => {
|
||||
e.insert(ps_id);
|
||||
// Keep attachmount_count up to date
|
||||
let attachment_count = self
|
||||
.attachment_count
|
||||
.entry(tenant_shard_attachment_id.tenant_id)
|
||||
.or_default();
|
||||
*attachment_count += attachment_count.checked_add(1).unwrap();
|
||||
// Keep nodes_timelines up to date
|
||||
let nodes_timelines = self.nodes_timelines.entry(ps_id).or_default();
|
||||
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
|
||||
tenant_shard_attachment_id.tenant_id,
|
||||
)) {
|
||||
let refcount = nodes_timelines.entry(*ttid).or_default();
|
||||
*refcount = refcount.checked_add(1).unwrap();
|
||||
}
|
||||
if nodes_timelines.is_empty() {
|
||||
self.nodes_timelines.remove(&ps_id);
|
||||
}
|
||||
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
|
||||
let activate_range =
|
||||
TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
|
||||
let activate: HashSet<TenantTimelineId> = self
|
||||
.quiesced_timelines
|
||||
.range(activate_range)
|
||||
.map(|(ttid, _quiesced_lsn)| *ttid)
|
||||
.collect();
|
||||
for tenant_timeline_id in activate {
|
||||
self.activate_timeline(tenant_timeline_id);
|
||||
}
|
||||
}
|
||||
(Detach, Occupied(e)) => {
|
||||
let ps_id = e.remove();
|
||||
// Keep attachment count up to date
|
||||
let attachment_count = self
|
||||
.attachment_count
|
||||
.get_mut(&tenant_shard_attachment_id.tenant_id)
|
||||
.expect("attachment action initializes the hasmap entry");
|
||||
*attachment_count = attachment_count.checked_sub(1).unwrap();
|
||||
// Keep nodes_timelines up to date
|
||||
let nodes_timelines = self
|
||||
.nodes_timelines
|
||||
.get_mut(&ps_id)
|
||||
.expect("attachment action initializes hashmap entry");
|
||||
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
|
||||
tenant_shard_attachment_id.tenant_id,
|
||||
)) {
|
||||
let refcount = nodes_timelines.entry(*ttid).or_default();
|
||||
*refcount = refcount.checked_sub(1).unwrap();
|
||||
}
|
||||
}
|
||||
(Detach, Vacant(_)) => {
|
||||
info!("detachment is already known");
|
||||
}
|
||||
}
|
||||
self.check_invariants();
|
||||
}
|
||||
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
|
||||
self.check_invariants();
|
||||
let RemoteConsistentLsnAdv {
|
||||
attachment,
|
||||
remote_consistent_lsn,
|
||||
} = adv;
|
||||
|
||||
match self.remote_consistent_lsns.entry(attachment) {
|
||||
btree_map::Entry::Occupied(mut occupied_entry) => {
|
||||
let current = occupied_entry.get_mut();
|
||||
use std::cmp::Ordering::*;
|
||||
match (*current).cmp(&remote_consistent_lsn) {
|
||||
Less => {
|
||||
*current = remote_consistent_lsn;
|
||||
let caught_up_count = self
|
||||
.caught_up_count
|
||||
.get_mut(&attachment.tenant_timeline_id)
|
||||
.unwrap();
|
||||
*caught_up_count = caught_up_count.checked_add(1).unwrap();
|
||||
if *caught_up_count
|
||||
== self.attachment_count[&attachment.tenant_timeline_id.tenant_id]
|
||||
{
|
||||
self.quiesce_timeline(attachment.tenant_timeline_id);
|
||||
}
|
||||
}
|
||||
Equal => {
|
||||
info!("ignoring no-op update, likely duplicate delivery");
|
||||
}
|
||||
Greater => {
|
||||
warn!(
|
||||
"ignoring advertisement because remote_consistent_lsn is moving backwards"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
btree_map::Entry::Vacant(_) => {
|
||||
let ttid = attachment.tenant_timeline_id;
|
||||
match self.quiesced_timelines.get(&ttid).cloned() {
|
||||
Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => {
|
||||
info!("ignoring no-op update for quiesced timeline");
|
||||
}
|
||||
Some(_) => {
|
||||
self.activate_timeline(ttid);
|
||||
// recurse one level, guarnateed to hit `Occupied` case above
|
||||
self.handle_remote_consistent_lsn_advertisement(adv);
|
||||
}
|
||||
None => {
|
||||
info!("ignoring advertisement because timeline is not known");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.check_invariants();
|
||||
}
|
||||
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, update: Lsn) {
|
||||
self.check_invariants();
|
||||
match self.commit_lsns.entry(ttid) {
|
||||
btree_map::Entry::Occupied(mut entry) => {
|
||||
let current = entry.get_mut();
|
||||
use std::cmp::Ordering::*;
|
||||
match (*current).cmp(&update) {
|
||||
Less => {
|
||||
*current = update;
|
||||
// We never allow remote_consistent_lsn to be ahead of commit_lsn.
|
||||
// Therefore, it is safe to say nothing is caught up anymore.
|
||||
let caught_up_count = self.caught_up_count.get_mut(&ttid).unwrap();
|
||||
*caught_up_count = 0;
|
||||
}
|
||||
Equal => {
|
||||
// This code runs in safekeeper impl, no reason why there would be duplicate delivery.
|
||||
warn!("ignoring no-op update; why is this happening?");
|
||||
}
|
||||
Greater => {
|
||||
panic!(
|
||||
"proposed commit_lsn would move it backwards: current={} update={}",
|
||||
current, update
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
btree_map::Entry::Vacant(entry) => {
|
||||
match self.quiesced_timelines.get(&ttid).cloned() {
|
||||
Some(quiesced_lsn) if quiesced_lsn == update => {
|
||||
info!("ignoring no-op update for quiesced timeline");
|
||||
}
|
||||
Some(_) => {
|
||||
self.activate_timeline(ttid);
|
||||
// recurse one level, guarnateed to hit `Occupied` case above
|
||||
self.handle_commit_lsn_advancement(ttid, update);
|
||||
}
|
||||
None => {
|
||||
info!("first time hearing about this timeline, initializing");
|
||||
entry.insert(update);
|
||||
let replaced = self.caught_up_count.insert(ttid, 0);
|
||||
// only commit_lsn advancement makes timelines known to world
|
||||
assert_eq!(None, replaced);
|
||||
for (attachment, node_id) in self
|
||||
.attachments
|
||||
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
|
||||
{
|
||||
let replaced = self.remote_consistent_lsns.insert(
|
||||
attachment.timeline_attachment_id(ttid.timeline_id),
|
||||
Lsn(0),
|
||||
);
|
||||
// only commit_lsn advancement makes timelines known to World
|
||||
assert_eq!(None, replaced);
|
||||
|
||||
let nodes_timelines = self.nodes_timelines.entry(*node_id).or_default();
|
||||
let refcount = nodes_timelines.entry(ttid).or_default();
|
||||
*refcount = refcount.checked_add(1).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.check_invariants();
|
||||
}
|
||||
|
||||
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
|
||||
let mut commit_lsn_advertisements_by_node: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> =
|
||||
HashMap::with_capacity(self.nodes_timelines.len());
|
||||
let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v));
|
||||
let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v));
|
||||
let remote_consistent_lsns_iter = self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v));
|
||||
|
||||
let join = merge_join::inner_equi_join_with_merge_strategy(
|
||||
commit_lsns_iter,
|
||||
attachments_iter,
|
||||
|(tenant_timeline_id, _)| tenant_timeline_id.tenant_id,
|
||||
|(shard_attachment_id, _)| shard_attachment_id.tenant_id,
|
||||
);
|
||||
let join = merge_join::left_equi_join_with_merge_strategy(
|
||||
join,
|
||||
remote_consistent_lsns_iter,
|
||||
|((ttid, _), _)| ttid.tenant_id,
|
||||
|(tlaid, _)| tlaid.tenant_timeline_id.tenant_id,
|
||||
);
|
||||
for ((c, a), r) in join {
|
||||
let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = c;
|
||||
let (_, node_id): (TenantShardAttachmentId, NodeId) = a;
|
||||
match r {
|
||||
// TODO: can > ever happen?
|
||||
Some((_, remote_consistent_lsn)) if remote_consistent_lsn >= commit_lsn => {
|
||||
// this timeline shard attachment is already caught up
|
||||
continue;
|
||||
}
|
||||
Some(_) | None => {
|
||||
// need to advertise
|
||||
// -> fallthrough
|
||||
}
|
||||
};
|
||||
// DISTINCT node_id, array_agg(DISTINCT tenant_shard_id )
|
||||
let for_node = commit_lsn_advertisements_by_node
|
||||
.entry(node_id)
|
||||
.or_insert_with(|| HashMap::with_capacity(self.nodes_timelines[&node_id].len()));
|
||||
match for_node.entry(tenant_timeline_id) {
|
||||
hash_map::Entry::Vacant(vacant_entry) => {
|
||||
vacant_entry.insert(commit_lsn);
|
||||
}
|
||||
hash_map::Entry::Occupied(occupied_entry) => {
|
||||
assert_eq!(*occupied_entry.get(), commit_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
commit_lsn_advertisements_by_node
|
||||
}
|
||||
|
||||
fn activate_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
|
||||
let quiesced_lsn = self
|
||||
.quiesced_timelines
|
||||
.remove(&tenant_timeline_id)
|
||||
.expect("must call this function only on quiesced tenant_timeline_id");
|
||||
let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn);
|
||||
assert_eq!(None, replaced);
|
||||
let reconstruct_remote_consistent_lsn_entries = self
|
||||
.attachments
|
||||
.range(TenantShardAttachmentId::tenant_range(
|
||||
tenant_timeline_id.tenant_id,
|
||||
))
|
||||
.map(|(k, _)| *k)
|
||||
.map(|tenant_shard_attachment_id| {
|
||||
(
|
||||
tenant_shard_attachment_id
|
||||
.timeline_attachment_id(tenant_timeline_id.timeline_id),
|
||||
quiesced_lsn,
|
||||
)
|
||||
});
|
||||
for (key, value) in reconstruct_remote_consistent_lsn_entries {
|
||||
let replaced = self.remote_consistent_lsns.insert(key, value);
|
||||
assert_eq!(None, replaced);
|
||||
}
|
||||
}
|
||||
|
||||
fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
|
||||
self.check_invariants();
|
||||
if self.quiesced_timelines.contains_key(&tenant_timeline_id) {
|
||||
panic!("only call this function on active timelines");
|
||||
}
|
||||
let quiesced_lsn = self
|
||||
.commit_lsns
|
||||
.remove(&tenant_timeline_id)
|
||||
.expect("inconsistent: we checked it's not in quiesced_timelines, so, must be active");
|
||||
let caught_up_count = self
|
||||
.caught_up_count
|
||||
.remove(&tenant_timeline_id)
|
||||
.expect("inconsistent: we checked it's not in quiesced_timleines, so, must be active");
|
||||
let mut remove_remote_consistent_lsns = Vec::new();
|
||||
for (k, remote_consistent_lsn) in self
|
||||
.remote_consistent_lsns
|
||||
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
|
||||
{
|
||||
assert_eq!(*remote_consistent_lsn, quiesced_lsn);
|
||||
remove_remote_consistent_lsns.push(*k);
|
||||
}
|
||||
assert_eq!(
|
||||
caught_up_count,
|
||||
u16::try_from(remove_remote_consistent_lsns.len()).unwrap()
|
||||
);
|
||||
for k in remove_remote_consistent_lsns {
|
||||
let removed = self.remote_consistent_lsns.remove(&k);
|
||||
assert!(removed.is_some(), "we just added");
|
||||
}
|
||||
let replaced = self
|
||||
.quiesced_timelines
|
||||
.insert(tenant_timeline_id, quiesced_lsn);
|
||||
assert_eq!(None, replaced); // we checked at function entry
|
||||
self.check_invariants();
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelineAttachmentId {
|
||||
pub fn timeline_range(ttid: TenantTimelineId) -> RangeInclusive<Self> {
|
||||
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
|
||||
let generation_range: RangeInclusive<_> = Generation::RANGE;
|
||||
RangeInclusive::new(
|
||||
TimelineAttachmentId {
|
||||
tenant_timeline_id: ttid,
|
||||
shard_id: *shard_index_range.start(),
|
||||
generation: *generation_range.start(),
|
||||
},
|
||||
TimelineAttachmentId {
|
||||
tenant_timeline_id: ttid,
|
||||
shard_id: *shard_index_range.end(),
|
||||
generation: *generation_range.end(),
|
||||
},
|
||||
)
|
||||
}
|
||||
pub fn tenant_shard_attachment_id(self) -> TenantShardAttachmentId {
|
||||
TenantShardAttachmentId {
|
||||
tenant_id: self.tenant_timeline_id.tenant_id,
|
||||
shard_id: self.shard_id,
|
||||
generation: self.generation,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TenantShardAttachmentId {
|
||||
pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId {
|
||||
TimelineAttachmentId {
|
||||
tenant_timeline_id: TenantTimelineId {
|
||||
tenant_id: self.tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
shard_id: self.shard_id,
|
||||
generation: self.generation,
|
||||
}
|
||||
}
|
||||
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
|
||||
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
|
||||
let generation_range: RangeInclusive<_> = Generation::RANGE;
|
||||
RangeInclusive::new(
|
||||
Self {
|
||||
tenant_id,
|
||||
shard_id: *shard_index_range.start(),
|
||||
generation: *generation_range.start(),
|
||||
},
|
||||
Self {
|
||||
tenant_id,
|
||||
shard_id: *shard_index_range.end(),
|
||||
generation: *generation_range.end(),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
224
libs/sk_ps_discovery/src/tests.rs
Normal file
224
libs/sk_ps_discovery/src/tests.rs
Normal file
@@ -0,0 +1,224 @@
|
||||
use utils::{id::TenantId, logging};
|
||||
|
||||
use super::*;
|
||||
use crate::World;
|
||||
|
||||
#[track_caller]
|
||||
fn validate_advertisements(
|
||||
actual: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>>,
|
||||
expect: Vec<(NodeId, Vec<(TenantTimelineId, Lsn)>)>,
|
||||
) {
|
||||
let expect: HashMap<_, _> = expect
|
||||
.into_iter()
|
||||
.map(|(node_id, innermap)| (node_id, innermap.into_iter().collect()))
|
||||
.collect();
|
||||
assert_eq!(actual, expect);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
let mut world = World::default();
|
||||
|
||||
let tenant_id = TenantId::from_array([0xff; 16]);
|
||||
let timeline_id = TimelineId::from_array([1; 16]);
|
||||
let timeline2 = TimelineId::from_array([2; 16]);
|
||||
|
||||
let attachment1 = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(2),
|
||||
};
|
||||
let attachment2 = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(3),
|
||||
};
|
||||
|
||||
let ps1 = NodeId(0x100);
|
||||
|
||||
// Out of order; in happy path, commit_lsn advances first, but let's test the
|
||||
// case where safekeeper doesn't know about the attachments yet first, before
|
||||
// we extend the case to the happy path.
|
||||
|
||||
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
|
||||
attachment: attachment1.timeline_attachment_id(timeline_id),
|
||||
remote_consistent_lsn: Lsn(0x23),
|
||||
});
|
||||
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
|
||||
attachment: attachment2.timeline_attachment_id(timeline_id),
|
||||
remote_consistent_lsn: Lsn(0x42),
|
||||
});
|
||||
// SK authoritative info on which advertisements ought exist is still empty
|
||||
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
|
||||
world.update_attachment(AttachmentUpdate {
|
||||
tenant_shard_attachment_id: attachment1,
|
||||
action: AttachmentUpdateAction::Attach { ps_id: ps1 },
|
||||
});
|
||||
// We have not inserted any commit_lsn info yet, so, still no advs expected
|
||||
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
|
||||
// insert commit_lsn info for different timeline
|
||||
world.handle_commit_lsn_advancement(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: timeline2,
|
||||
},
|
||||
Lsn(0x66),
|
||||
);
|
||||
// Advs should still be empty
|
||||
validate_advertisements(
|
||||
world.get_commit_lsn_advertisements(),
|
||||
vec![(
|
||||
ps1,
|
||||
vec![(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: timeline2,
|
||||
},
|
||||
Lsn(0x66),
|
||||
)],
|
||||
)],
|
||||
);
|
||||
|
||||
// Ok, out of order part tested. Now Safekeeper learns about the attachments.
|
||||
|
||||
// insert commit_lsn info for the timeline we have remote_consistent_lsn info for
|
||||
world.handle_commit_lsn_advancement(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
Lsn(0x55),
|
||||
);
|
||||
dbg!(&world);
|
||||
// Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it.
|
||||
validate_advertisements(
|
||||
world.get_commit_lsn_advertisements(),
|
||||
vec![(
|
||||
ps1,
|
||||
vec![(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
Lsn(0x55),
|
||||
)],
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn advertisement_for_new_timeline() {
|
||||
let mut world = World::default();
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let ttid = TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
let tenant_shard_attachment_id = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(2),
|
||||
};
|
||||
|
||||
let ps_id = NodeId(0x100);
|
||||
|
||||
world.update_attachment(AttachmentUpdate {
|
||||
tenant_shard_attachment_id,
|
||||
action: AttachmentUpdateAction::Attach { ps_id },
|
||||
});
|
||||
world.handle_commit_lsn_advancement(ttid, Lsn(23));
|
||||
|
||||
let advs = world.get_commit_lsn_advertisements();
|
||||
validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn quiescing_timeline_catchup() {
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Test,
|
||||
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
logging::Output::Stdout,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut world = World::default();
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let ttid = TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
let tenant_shard_attachment_id = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(2),
|
||||
};
|
||||
|
||||
let ps_id = NodeId(0x100);
|
||||
|
||||
world.update_attachment(AttachmentUpdate {
|
||||
tenant_shard_attachment_id,
|
||||
action: AttachmentUpdateAction::Attach { ps_id },
|
||||
});
|
||||
world.handle_commit_lsn_advancement(ttid, Lsn(23));
|
||||
|
||||
assert!(world.quiesced_timelines.is_empty());
|
||||
|
||||
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
|
||||
attachment: tenant_shard_attachment_id.timeline_attachment_id(timeline_id),
|
||||
remote_consistent_lsn: Lsn(23),
|
||||
});
|
||||
|
||||
assert!(world.quiesced_timelines.contains_key(&ttid));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nodes_timelines() {
|
||||
let mut world = World::default();
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::from_array([0x1; 16]);
|
||||
let ttid = TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
let tenant_shard_attachment_id = TenantShardAttachmentId {
|
||||
tenant_id,
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(2),
|
||||
};
|
||||
|
||||
let ps_id = NodeId(0x100);
|
||||
|
||||
world.update_attachment(AttachmentUpdate {
|
||||
tenant_shard_attachment_id,
|
||||
action: AttachmentUpdateAction::Attach { ps_id },
|
||||
});
|
||||
|
||||
assert!(world.nodes_timelines.get(&ps_id).is_none());
|
||||
|
||||
world.handle_commit_lsn_advancement(ttid, Lsn(0x23));
|
||||
|
||||
assert_eq!(world.nodes_timelines[&ps_id].len(), 1);
|
||||
|
||||
let timeline2 = TimelineId::from_array([0x2; 16]);
|
||||
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
|
||||
attachment: TimelineAttachmentId {
|
||||
tenant_timeline_id: TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: timeline2,
|
||||
},
|
||||
shard_id: ShardIndex::unsharded(),
|
||||
generation: Generation::Valid(2),
|
||||
},
|
||||
remote_consistent_lsn: Lsn(0x42),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: need more tests, esp for the removal path
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::fmt::Debug;
|
||||
use std::{fmt::Debug, ops::RangeInclusive};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -25,7 +25,9 @@ pub enum Generation {
|
||||
/// scenarios where pageservers might otherwise issue conflicting writes to
|
||||
/// remote storage
|
||||
impl Generation {
|
||||
pub const MIN: Self = Self::None;
|
||||
pub const MAX: Self = Self::Valid(u32::MAX);
|
||||
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
|
||||
|
||||
/// Create a new Generation that represents a legacy key format with
|
||||
/// no generation suffix
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::fmt;
|
||||
use std::num::ParseIntError;
|
||||
use std::ops::RangeInclusive;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -320,6 +321,19 @@ impl TenantTimelineId {
|
||||
pub fn empty() -> Self {
|
||||
Self::new(TenantId::from([0u8; 16]), TimelineId::from([0u8; 16]))
|
||||
}
|
||||
|
||||
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
|
||||
RangeInclusive::new(
|
||||
Self {
|
||||
tenant_id,
|
||||
timeline_id: TimelineId::from_array([u8::MIN; 16]),
|
||||
},
|
||||
Self {
|
||||
tenant_id,
|
||||
timeline_id: TimelineId::from_array([u8::MAX; 16]),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TenantTimelineId {
|
||||
|
||||
@@ -95,6 +95,9 @@ pub mod guard_arc_swap;
|
||||
|
||||
pub mod elapsed_accum;
|
||||
|
||||
pub mod merge_join;
|
||||
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
|
||||
164
libs/utils/src/merge_join.rs
Normal file
164
libs/utils/src/merge_join.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
pub fn inner_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
|
||||
l: L,
|
||||
r: R,
|
||||
key_l: FL,
|
||||
key_r: FR,
|
||||
) -> impl Iterator<Item = (LI, RI)>
|
||||
where
|
||||
L: Iterator<Item = LI>, // + Sorted
|
||||
R: Iterator<Item = RI>, // + Sorted
|
||||
FL: 'static + Fn(&LI) -> K,
|
||||
FR: 'static + Fn(&RI) -> K,
|
||||
LI: Copy,
|
||||
RI: Copy,
|
||||
K: PartialEq + Eq + Ord,
|
||||
{
|
||||
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
|
||||
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
|
||||
std::iter::from_fn(move || {
|
||||
loop {
|
||||
match (l.peek(), r.peek()) {
|
||||
(Some((_, lk)), Some((_, rk))) if lk < rk => {
|
||||
drop(l.next());
|
||||
continue;
|
||||
}
|
||||
(Some((_, lk)), Some((_, rk))) if lk > rk => {
|
||||
drop(r.next());
|
||||
continue;
|
||||
}
|
||||
(Some((lv, lk)), Some((_, rk))) => {
|
||||
assert!(lk == rk);
|
||||
let (rv, _) = r.next().unwrap();
|
||||
return Some((lv.clone(), rv));
|
||||
}
|
||||
(None, None) | (None, Some(_)) | (Some(_), None) => return None,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn left_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
|
||||
l: L,
|
||||
r: R,
|
||||
key_l: FL,
|
||||
key_r: FR,
|
||||
) -> impl Iterator<Item = (LI, Option<RI>)>
|
||||
where
|
||||
L: Iterator<Item = LI>, // + Sorted
|
||||
R: Iterator<Item = RI>, // + Sorted
|
||||
FL: 'static + Fn(&LI) -> K,
|
||||
FR: 'static + Fn(&RI) -> K,
|
||||
LI: Copy,
|
||||
RI: Copy,
|
||||
K: PartialEq + Eq + Ord,
|
||||
{
|
||||
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
|
||||
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
|
||||
let mut l_had_match = false;
|
||||
std::iter::from_fn(move || {
|
||||
loop {
|
||||
match (l.peek(), r.peek()) {
|
||||
(Some((_, lk)), Some((_, rk))) if lk < rk => {
|
||||
let (lv, _) = l.next().unwrap();
|
||||
if l_had_match {
|
||||
l_had_match = false;
|
||||
continue;
|
||||
} else {
|
||||
return Some((lv, None));
|
||||
}
|
||||
}
|
||||
(Some((_, _)), None) => {
|
||||
let (lv, _) = l.next().unwrap();
|
||||
if l_had_match {
|
||||
l_had_match = false;
|
||||
continue;
|
||||
} else {
|
||||
return Some((lv, None));
|
||||
}
|
||||
}
|
||||
(Some((_, lk)), Some((_, rk))) if lk > rk => {
|
||||
drop(r.next());
|
||||
continue;
|
||||
}
|
||||
(Some((lv, lk)), Some((_, rk))) => {
|
||||
l_had_match = true;
|
||||
assert!(lk == rk);
|
||||
let (rv, _) = r.next().unwrap();
|
||||
return Some((lv.clone(), Some(rv)));
|
||||
}
|
||||
(None, None) | (None, Some(_)) => return None,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn inner_equi_basic() {
|
||||
let l = vec![b"a", b"c"];
|
||||
let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"];
|
||||
|
||||
let res: Vec<_> = super::inner_equi_join_with_merge_strategy(
|
||||
l.into_iter(),
|
||||
r.into_iter(),
|
||||
|l| &l[0..1],
|
||||
|r| &r[0..1],
|
||||
)
|
||||
.collect();
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
vec![
|
||||
(b"a", b"aa"),
|
||||
(b"a", b"ad"),
|
||||
(b"c", b"ca"),
|
||||
(b"c", b"cb"),
|
||||
(b"c", b"cd"),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn left_equi_basic() {
|
||||
/*
|
||||
create table aleft (id text, aleft text);
|
||||
create table aright (id text, aright text);
|
||||
insert into aleft values ('a', 'a'), ('b', 'b');
|
||||
insert into aright values ('a', 'aa'), ('a', 'ab'), ('c', 'cd');
|
||||
select * from aleft left join aright using ("id");
|
||||
*/
|
||||
|
||||
let l = vec![b"a", b"b"];
|
||||
let r = vec![b"aa", b"ab", b"cd"];
|
||||
|
||||
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
|
||||
l.into_iter(),
|
||||
r.into_iter(),
|
||||
|l| &l[0..1],
|
||||
|r| &r[0..1],
|
||||
)
|
||||
.collect();
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
vec![(b"a", Some(b"aa")), (b"a", Some(b"ab")), (b"b", None)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn left_equi_basic_2() {
|
||||
let l = vec![b"b"];
|
||||
let r = vec![b"aa", b"ab", b"bb"];
|
||||
|
||||
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
|
||||
l.into_iter(),
|
||||
r.into_iter(),
|
||||
|l| &l[0..1],
|
||||
|r| &r[0..1],
|
||||
)
|
||||
.collect();
|
||||
|
||||
assert_eq!(res, vec![(b"b", Some(b"bb"))])
|
||||
}
|
||||
}
|
||||
@@ -52,6 +52,7 @@ pub struct TenantShardId {
|
||||
impl ShardCount {
|
||||
pub const MAX: Self = Self(u8::MAX);
|
||||
pub const MIN: Self = Self(0);
|
||||
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
|
||||
|
||||
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
|
||||
/// legacy format for TenantShardId that excludes the shard suffix", also known
|
||||
@@ -85,7 +86,9 @@ impl ShardCount {
|
||||
}
|
||||
|
||||
impl ShardNumber {
|
||||
pub const MIN: Self = Self(0);
|
||||
pub const MAX: Self = Self(u8::MAX);
|
||||
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
|
||||
}
|
||||
|
||||
impl TenantShardId {
|
||||
@@ -100,16 +103,17 @@ impl TenantShardId {
|
||||
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
|
||||
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
|
||||
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
|
||||
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
|
||||
RangeInclusive::new(
|
||||
Self {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: ShardCount(0),
|
||||
shard_number: shard_index_range.start().shard_number,
|
||||
shard_count: shard_index_range.start().shard_count,
|
||||
},
|
||||
Self {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber::MAX,
|
||||
shard_count: ShardCount::MAX,
|
||||
shard_number: shard_index_range.end().shard_number,
|
||||
shard_count: shard_index_range.end().shard_count,
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -241,6 +245,16 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
}
|
||||
|
||||
impl ShardIndex {
|
||||
pub const MIN: Self = ShardIndex {
|
||||
shard_number: ShardNumber::MIN,
|
||||
shard_count: ShardCount::MIN,
|
||||
};
|
||||
pub const MAX: Self = ShardIndex {
|
||||
shard_number: ShardNumber::MAX,
|
||||
shard_count: ShardCount::MAX,
|
||||
};
|
||||
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
|
||||
|
||||
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
|
||||
Self {
|
||||
shard_number: number,
|
||||
|
||||
@@ -4,3 +4,5 @@ pub mod duplex;
|
||||
pub mod gate;
|
||||
|
||||
pub mod spsc_fold;
|
||||
|
||||
pub mod spsc_watch;
|
||||
|
||||
@@ -56,7 +56,7 @@ impl<T: Send> Sender<T> {
|
||||
/// # Panics
|
||||
///
|
||||
/// If `try_fold` panics, any subsequent call to `send` panic.
|
||||
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), SendError>
|
||||
pub async fn send<F>(&mut self, value: T, try_fold: F) -> Result<(), (T, SendError)>
|
||||
where
|
||||
F: Fn(&mut T, T) -> Result<(), T>,
|
||||
{
|
||||
@@ -104,7 +104,9 @@ impl<T: Send> Sender<T> {
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
|
||||
State::ReceiverGone => {
|
||||
Poll::Ready(Err((value.take().unwrap(), SendError::ReceiverGone)))
|
||||
}
|
||||
State::SenderGone(_)
|
||||
| State::AllGone
|
||||
| State::SenderDropping
|
||||
|
||||
55
libs/utils/src/sync/spsc_watch.rs
Normal file
55
libs/utils/src/sync/spsc_watch.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
//! watch is probably not the right word, because we do take out
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::sync::spsc_fold;
|
||||
|
||||
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
|
||||
let (tx, rx) = spsc_fold::channel();
|
||||
let cancel = CancellationToken::new();
|
||||
(
|
||||
Sender {
|
||||
tx,
|
||||
_cancel: cancel.clone().drop_guard(),
|
||||
},
|
||||
Receiver { rx, cancel },
|
||||
)
|
||||
}
|
||||
|
||||
pub struct Sender<T> {
|
||||
tx: spsc_fold::Sender<T>,
|
||||
_cancel: tokio_util::sync::DropGuard,
|
||||
}
|
||||
|
||||
pub struct Receiver<T> {
|
||||
rx: spsc_fold::Receiver<T>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl<T: Send> Sender<T> {
|
||||
pub fn send_replace(&mut self, value: T) -> Result<(), (T, spsc_fold::SendError)> {
|
||||
poll_ready(self.tx.send(value, |old, new| {
|
||||
*old = new;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> Receiver<T> {
|
||||
pub async fn recv(&mut self) -> Result<T, spsc_fold::RecvError> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
pub async fn cancelled(&mut self) {
|
||||
self.cancel.cancelled().await
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_ready<F: Future<Output = O>, O>(f: F) -> O {
|
||||
futures::executor::block_on(async move {
|
||||
let f = std::pin::pin!(f);
|
||||
match futures::poll!(f) {
|
||||
std::task::Poll::Ready(r) => r,
|
||||
std::task::Poll::Pending => unreachable!("expecting future to always return Ready"),
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -423,11 +423,14 @@ fn start_pageserver(
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
);
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(
|
||||
let service_client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
tls_config,
|
||||
)
|
||||
)?;
|
||||
anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(
|
||||
service_client,
|
||||
))
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -100,7 +100,7 @@ pub struct State {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
allowlist_routes: &'static [&'static str],
|
||||
remote_storage: GenericRemoteStorage,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
@@ -114,7 +114,7 @@ impl State {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
|
||||
@@ -48,7 +48,6 @@ use remote_timeline_client::{
|
||||
download_tenant_manifest,
|
||||
};
|
||||
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
|
||||
use timeline::import_pgdata::ImportingTimeline;
|
||||
use timeline::offload::{OffloadError, offload_timeline};
|
||||
@@ -153,7 +152,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
/// as the shared remote storage client and process initialization state.
|
||||
#[derive(Clone)]
|
||||
pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
@@ -2107,7 +2106,7 @@ impl TenantShard {
|
||||
async fn unoffload_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: RequestContext,
|
||||
) -> Result<Arc<Timeline>, TimelineArchivalError> {
|
||||
info!("unoffloading timeline");
|
||||
@@ -2242,7 +2241,7 @@ impl TenantShard {
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
new_state: TimelineArchivalState,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
info!("setting timeline archival config");
|
||||
@@ -2571,7 +2570,7 @@ impl TenantShard {
|
||||
pub(crate) async fn create_timeline(
|
||||
self: &Arc<TenantShard>,
|
||||
params: CreateTimelineParams,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
if !self.is_active() {
|
||||
@@ -3299,7 +3298,7 @@ impl TenantShard {
|
||||
/// to delay background jobs. Background jobs can be started right away when None is given.
|
||||
fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
|
||||
@@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
|
||||
use rand::Rng;
|
||||
use remote_storage::DownloadError;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{Notify, oneshot, watch};
|
||||
@@ -2080,7 +2079,7 @@ impl Timeline {
|
||||
pub(crate) fn activate(
|
||||
self: &Arc<Self>,
|
||||
parent: Arc<crate::tenant::TenantShard>,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
@@ -3114,7 +3113,7 @@ impl Timeline {
|
||||
fn launch_wal_receiver(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
broker_client: BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
) {
|
||||
info!(
|
||||
"launching WAL receiver for timeline {} of tenant {}",
|
||||
|
||||
@@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
tenant: Arc<TenantShard>,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
self.write(|raw_timeline| async move {
|
||||
|
||||
@@ -28,7 +28,6 @@ use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -70,7 +69,7 @@ impl WalReceiver {
|
||||
pub fn start(
|
||||
timeline: Arc<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
mut broker_client: BrokerClientChannel,
|
||||
mut broker_client: storage_broker::TimelineUpdatesSubscriber,
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
let tenant_shard_id = timeline.tenant_shard_id;
|
||||
|
||||
@@ -17,19 +17,12 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
|
||||
TypedMessage,
|
||||
};
|
||||
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
||||
use storage_broker::proto::SafekeeperDiscoveryResponse;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
|
||||
};
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::{
|
||||
@@ -56,7 +49,7 @@ pub(crate) struct Cancelled;
|
||||
///
|
||||
/// Not cancellation-safe. Use `cancel` token to request cancellation.
|
||||
pub(super) async fn connection_manager_loop_step(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
broker_client: &mut storage_broker::TimelineUpdatesSubscriber,
|
||||
connection_manager_state: &mut ConnectionManagerState,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
@@ -81,11 +74,6 @@ pub(super) async fn connection_manager_loop_step(
|
||||
WALRECEIVER_ACTIVE_MANAGERS.dec();
|
||||
}
|
||||
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
|
||||
timeline_id: connection_manager_state.timeline.timeline_id,
|
||||
};
|
||||
|
||||
let mut timeline_state_updates = connection_manager_state
|
||||
.timeline
|
||||
.subscribe_for_state_updates();
|
||||
@@ -101,7 +89,12 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// Subscribe to the broker updates. Stream shares underlying TCP connection
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||
let (timeline_updates, mut discovery_requester) = broker_client.subscribe(
|
||||
connection_manager_state.timeline.tenant_shard_id,
|
||||
connection_manager_state.timeline.timeline_id,
|
||||
cancel,
|
||||
);
|
||||
let mut timeline_updates = Box::pin(timeline_updates);
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
@@ -155,29 +148,10 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
},
|
||||
|
||||
// Got a new update from the broker
|
||||
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
|
||||
match broker_update {
|
||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
|
||||
// tonic's error handling doesn't provide a clear code for disconnections: we get
|
||||
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
|
||||
// => https://github.com/neondatabase/neon/issues/9562
|
||||
info!("broker disconnected: {status}");
|
||||
},
|
||||
_ => {
|
||||
warn!("broker subscription failed: {status}");
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("broker subscription stream ended"); // can't happen
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
// Got a new update from the broker.
|
||||
// The stream ends with None if and only if `cancel` is cancelled.
|
||||
Some(timeline_update) = timeline_updates.next() => {
|
||||
connection_manager_state.register_timeline_update(timeline_update)
|
||||
},
|
||||
|
||||
new_event = async {
|
||||
@@ -258,32 +232,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
tokio::time::sleep(next_discovery_ts - now).await;
|
||||
}
|
||||
|
||||
let tenant_timeline_id = Some(ProtoTenantTimelineId {
|
||||
tenant_id: id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: id.timeline_id.as_ref().to_owned(),
|
||||
});
|
||||
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
|
||||
let msg = TypedMessage {
|
||||
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
|
||||
safekeeper_timeline_info: None,
|
||||
safekeeper_discovery_request: Some(request),
|
||||
safekeeper_discovery_response: None,
|
||||
};
|
||||
info!("No active connection and no candidates, sending discovery request to the broker");
|
||||
discovery_requester.request().await;
|
||||
|
||||
last_discovery_ts = Some(std::time::Instant::now());
|
||||
info!("No active connection and no candidates, sending discovery request to the broker");
|
||||
|
||||
// Cancellation safety: we want to send a message to the broker, but publish_one()
|
||||
// function can get cancelled by the other select! arm. This is absolutely fine, because
|
||||
// we just want to receive broker updates and discovery is not important if we already
|
||||
// receive updates.
|
||||
//
|
||||
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
|
||||
// This is totally fine because of the reason above.
|
||||
|
||||
// This is a fire-and-forget request, we don't care about the response
|
||||
let _ = broker_client.publish_one(msg).await;
|
||||
debug!("Discovery request sent to the broker");
|
||||
None
|
||||
} => {}
|
||||
}
|
||||
@@ -298,63 +251,6 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
}
|
||||
|
||||
/// Endlessly try to subscribe for broker updates for a given timeline.
|
||||
async fn subscribe_for_timeline_updates(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
id: TenantTimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Streaming<TypedMessage>, Cancelled> {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
exponential_backoff(
|
||||
attempt,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
|
||||
// subscribe to the specific timeline
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo as i32,
|
||||
},
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
|
||||
},
|
||||
],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: id.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
}),
|
||||
};
|
||||
|
||||
match {
|
||||
tokio::select! {
|
||||
r = broker_client.subscribe_by_filter(request) => { r }
|
||||
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||
}
|
||||
} {
|
||||
Ok(resp) => {
|
||||
return Ok(resp.into_inner());
|
||||
}
|
||||
Err(e) => {
|
||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
|
||||
info!(
|
||||
"Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
|
||||
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
|
||||
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
|
||||
@@ -695,44 +591,14 @@ impl ConnectionManagerState {
|
||||
}
|
||||
|
||||
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
|
||||
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
|
||||
let mut is_discovery = false;
|
||||
let timeline_update = match typed_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => {
|
||||
let info = match typed_msg.safekeeper_timeline_info {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_timeline_info");
|
||||
return;
|
||||
}
|
||||
};
|
||||
SafekeeperDiscoveryResponse {
|
||||
safekeeper_id: info.safekeeper_id,
|
||||
tenant_timeline_id: info.tenant_timeline_id,
|
||||
commit_lsn: info.commit_lsn,
|
||||
safekeeper_connstr: info.safekeeper_connstr,
|
||||
availability_zone: info.availability_zone,
|
||||
standby_horizon: info.standby_horizon,
|
||||
}
|
||||
}
|
||||
MessageType::SafekeeperDiscoveryResponse => {
|
||||
is_discovery = true;
|
||||
match typed_msg.safekeeper_discovery_response {
|
||||
Some(response) => response,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_discovery_response");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// unexpected message
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) {
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
let storage_broker::TimelineShardUpdate {
|
||||
is_discovery,
|
||||
inner: timeline_update,
|
||||
} = timeline_update;
|
||||
|
||||
trace!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
@@ -1013,7 +879,7 @@ impl ConnectionManagerState {
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
availability_zone: self.conf.availability_zone.as_deref(),
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
|
||||
@@ -52,6 +52,7 @@ tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
@@ -62,9 +63,11 @@ pq_proto.workspace = true
|
||||
remote_storage.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
sk_ps_discovery.workspace = true
|
||||
sha2.workspace = true
|
||||
sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
storage_controller_client.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -8,11 +8,12 @@ use std::error::Error as _;
|
||||
use http_utils::error::HttpErrorBody;
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use safekeeper_api::models::{
|
||||
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
|
||||
TimelineStatus,
|
||||
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
|
||||
TenantShardPageserverAttachmentChange, TimelineCreateRequest, TimelineStatus,
|
||||
};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::logging::SecretString;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
@@ -189,6 +190,20 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn post_tenant_shard_pageserver_attachments(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
body: TenantShardPageserverAttachmentChange,
|
||||
) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/pageserver_attachments",
|
||||
tenant_shard_id.tenant_id,
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
let resp = self.post(uri, body).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
async fn post<B: serde::Serialize, U: IntoUrl>(
|
||||
&self,
|
||||
uri: U,
|
||||
|
||||
@@ -25,7 +25,7 @@ use safekeeper::defaults::{
|
||||
use safekeeper::wal_backup::WalBackup;
|
||||
use safekeeper::{
|
||||
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
|
||||
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
|
||||
WAL_ADVERTISER_RUNTIME, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
|
||||
};
|
||||
use sd_notify::NotifyState;
|
||||
use storage_broker::{DEFAULT_ENDPOINT, Uri};
|
||||
@@ -626,6 +626,30 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
let ps_connectivity_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||
.spawn(
|
||||
global_timelines
|
||||
.get_pageserver_connectivity()
|
||||
.task_main()
|
||||
.instrument(info_span!("pageserver_connectivity")),
|
||||
)
|
||||
.map(|res| ("pageserver connectivity".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(ps_connectivity_handle));
|
||||
|
||||
let wal_advertiser_task_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_ADVERTISER_RUNTIME.handle())
|
||||
.spawn(
|
||||
global_timelines
|
||||
.get_wal_advertiser()
|
||||
.task_main()
|
||||
.instrument(info_span!("wal_advertiser_main")),
|
||||
)
|
||||
.map(|res| ("wal advertiser task handle".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_advertiser_task_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
|
||||
@@ -50,7 +50,8 @@ async fn push_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -97,7 +98,8 @@ async fn pull_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -153,7 +155,8 @@ async fn discover_loop(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
)?
|
||||
.into_raw_grpc_client();
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
|
||||
@@ -67,6 +67,19 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
|
||||
})
|
||||
}
|
||||
|
||||
async fn post_tenant_pageserver_attachments(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
let body: models::TenantShardPageserverAttachmentChange = json_request(&mut request).await?;
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let wal_advertiser = global_timelines.get_wal_advertiser();
|
||||
wal_advertiser
|
||||
.update_pageserver_attachments(tenant_id, body)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Deactivates all timelines for the tenant and removes its data directory.
|
||||
/// See `timeline_delete_handler`.
|
||||
async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -718,6 +731,9 @@ pub fn make_router(
|
||||
})
|
||||
})
|
||||
.get("/v1/utilization", |r| request_span(r, utilization_handler))
|
||||
.post("/v1/tenant/:tenant_id/pageserver_attachments", |r| {
|
||||
request_span(r, post_tenant_pageserver_attachments)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
|
||||
@@ -38,11 +38,13 @@ pub mod timeline_eviction;
|
||||
pub mod timeline_guard;
|
||||
pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_advertiser;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
pub(crate) mod pageserver_connectivity;
|
||||
|
||||
#[cfg(any(test, feature = "benchmarking"))]
|
||||
pub mod test_utils;
|
||||
@@ -123,6 +125,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
pub enable_tls_wal_service_api: bool,
|
||||
pub storage_controller_api: Option<Uri>,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -168,6 +171,7 @@ impl SafeKeeperConf {
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
enable_tls_wal_service_api: false,
|
||||
storage_controller_api: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -198,6 +202,14 @@ pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
pub static WAL_ADVERTISER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("wal advertiser worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("WAL backup worker")
|
||||
|
||||
117
safekeeper/src/pageserver_connectivity.rs
Normal file
117
safekeeper/src/pageserver_connectivity.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
use desim::world::Node;
|
||||
use hyper::Uri;
|
||||
use pageserver_api::controller_api;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, hash_map},
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use tracing::{Instrument, error, info, info_span, warn};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
sync::{spsc_fold, spsc_watch},
|
||||
};
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
type Advs = HashMap<TenantTimelineId, Lsn>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct GlobalState {
|
||||
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
|
||||
}
|
||||
|
||||
enum Message {
|
||||
Resolve {
|
||||
ps_id: NodeId,
|
||||
reply: tokio::sync::oneshot::Sender<tokio::sync::watch::Receiver<hyper::Uri>>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
|
||||
let mut ret = None;
|
||||
self.inner.get_or_init(|| {
|
||||
let (tx, task_fut) = MainTask::prepare_run();
|
||||
ret = Some(task_fut);
|
||||
tx
|
||||
});
|
||||
ret.expect("must only call this method once")
|
||||
}
|
||||
}
|
||||
|
||||
struct MainTask {
|
||||
rx: tokio::sync::mpsc::Receiver<Message>,
|
||||
}
|
||||
|
||||
impl MainTask {
|
||||
fn prepare_run() -> (
|
||||
tokio::sync::mpsc::Sender<Message>,
|
||||
impl Future<Output = anyhow::Result<()>> + Send,
|
||||
) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
|
||||
let task = MainTask { rx };
|
||||
(tx, task.task())
|
||||
}
|
||||
async fn task(mut self) -> anyhow::Result<()> {
|
||||
// TODO: persistence
|
||||
|
||||
let storcon_client = todo!();
|
||||
|
||||
let mut resolution: HashMap<NodeId, tokio::sync::watch::Sender<hyper::Uri>> =
|
||||
HashMap::new();
|
||||
|
||||
while let Some(rx) = self.rx.recv().await {
|
||||
match rx {
|
||||
Message::Resolve { ps_id, reply } => match resolution.entry(ps_id) {
|
||||
hash_map::Entry::Occupied(e) => {}
|
||||
hash_map::Entry::Vacant(e) => {
|
||||
tokio::spawn(
|
||||
ResolutionTask { ps_id, storcon_client }.run()
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ResolutionTask {
|
||||
ps_id: NodeId,
|
||||
storcon_client: storage_controller_client::control_api::Client,
|
||||
}
|
||||
|
||||
impl ResolutionTask {
|
||||
pub async fn run(self) -> Result<Uri, Error> {
|
||||
loop {
|
||||
// XXX: well-defined upcall API?
|
||||
let res = self
|
||||
.storcon_client
|
||||
.dispatch(
|
||||
reqwest::Method::GET,
|
||||
format!("control/v1/node/{}", self.node_id),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let node: NodeDescribeResponse = match res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!("storcon upcall failed")
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -117,6 +117,7 @@ impl Env {
|
||||
Arc::new(TimelinesSet::default()), // ignored for now
|
||||
RateLimiter::new(0, 0),
|
||||
wal_backup,
|
||||
todo!(),
|
||||
);
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -39,7 +39,9 @@ use crate::wal_backup;
|
||||
use crate::wal_backup::{WalBackup, remote_timeline_path};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
||||
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
|
||||
use crate::{
|
||||
SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage,
|
||||
};
|
||||
|
||||
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
|
||||
PeerInfo {
|
||||
@@ -547,6 +549,7 @@ impl Timeline {
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
wal_backup: Arc<WalBackup>,
|
||||
wal_advertiser: Arc<wal_advertiser::GlobalState>,
|
||||
) {
|
||||
let (tx, rx) = self.manager_ctl.bootstrap_manager();
|
||||
|
||||
@@ -570,6 +573,7 @@ impl Timeline {
|
||||
rx,
|
||||
partial_backup_rate_limiter,
|
||||
wal_backup,
|
||||
wal_advertiser,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ impl Manager {
|
||||
&& next_event.is_none()
|
||||
&& self.access_service.is_empty()
|
||||
&& !self.tli_broker_active.get()
|
||||
&& self.wal_advertiser.ready_for_eviction()
|
||||
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
|
||||
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
|
||||
// And it is the next one after the last removed. Given that local
|
||||
|
||||
@@ -22,7 +22,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, info, info_span, instrument, warn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::control_file::{FileStorage, Storage};
|
||||
use crate::metrics::{
|
||||
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES,
|
||||
@@ -37,6 +36,7 @@ use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
|
||||
use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
|
||||
use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
|
||||
use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
|
||||
use crate::{SafeKeeperConf, wal_advertiser};
|
||||
|
||||
pub(crate) struct StateSnapshot {
|
||||
// inmem values
|
||||
@@ -201,6 +201,7 @@ pub(crate) struct Manager {
|
||||
pub(crate) wal_seg_size: usize,
|
||||
pub(crate) walsenders: Arc<WalSenders>,
|
||||
pub(crate) wal_backup: Arc<WalBackup>,
|
||||
pub(crate) wal_advertiser: wal_advertiser::SafekeeperTimelineHandle,
|
||||
|
||||
// current state
|
||||
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
|
||||
@@ -240,6 +241,7 @@ pub async fn main_task(
|
||||
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
wal_backup: Arc<WalBackup>,
|
||||
wal_advertiser: Arc<wal_advertiser::GlobalState>,
|
||||
) {
|
||||
tli.set_status(Status::Started);
|
||||
|
||||
@@ -259,6 +261,7 @@ pub async fn main_task(
|
||||
manager_tx,
|
||||
global_rate_limiter,
|
||||
wal_backup,
|
||||
wal_advertiser,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -287,7 +290,8 @@ pub async fn main_task(
|
||||
|
||||
mgr.set_status(Status::UpdateBackup);
|
||||
let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
|
||||
mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
|
||||
|
||||
mgr.update_broker_active(is_wal_backup_required, num_computes, &state_snapshot);
|
||||
|
||||
mgr.set_status(Status::UpdateControlFile);
|
||||
mgr.update_control_file_save(&state_snapshot, &mut next_event)
|
||||
@@ -419,6 +423,7 @@ impl Manager {
|
||||
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
wal_backup: Arc<WalBackup>,
|
||||
wal_advertiser: Arc<wal_advertiser::GlobalState>,
|
||||
) -> Manager {
|
||||
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
|
||||
Manager {
|
||||
@@ -428,6 +433,7 @@ impl Manager {
|
||||
state_version_rx: tli.get_state_version_rx(),
|
||||
num_computes_rx: tli.get_walreceivers().get_num_rx(),
|
||||
tli_broker_active: broker_active_set.guard(tli.clone()),
|
||||
wal_advertiser: wal_advertiser.new_timeline(tli.clone()).await.unwrap(),
|
||||
last_removed_segno: 0,
|
||||
is_offloaded,
|
||||
backup_task: None,
|
||||
@@ -494,8 +500,8 @@ impl Manager {
|
||||
is_wal_backup_required
|
||||
}
|
||||
|
||||
/// Update is_active flag and returns its value.
|
||||
fn update_is_active(
|
||||
/// Update broker is_active flag and returns its value.
|
||||
fn update_broker_active(
|
||||
&mut self,
|
||||
is_wal_backup_required: bool,
|
||||
num_computes: usize,
|
||||
@@ -505,6 +511,7 @@ impl Manager {
|
||||
|| num_computes > 0
|
||||
|| state.remote_consistent_lsn < state.commit_lsn;
|
||||
|
||||
|
||||
// update the broker timeline set
|
||||
if self.tli_broker_active.set(is_active) {
|
||||
// write log if state has changed
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_t
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::WalBackup;
|
||||
use crate::wal_storage::Storage;
|
||||
use crate::{SafeKeeperConf, control_file, wal_storage};
|
||||
use crate::{SafeKeeperConf, control_file, pageserver_connectivity, wal_advertiser, wal_storage};
|
||||
|
||||
// Timeline entry in the global map: either a ready timeline, or mark that it is
|
||||
// being created.
|
||||
@@ -47,6 +47,8 @@ struct GlobalTimelinesState {
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
wal_advertisement: Arc<wal_advertiser::GlobalState>,
|
||||
pageserver_connectivity: Arc<pageserver_connectivity::GlobalState>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
wal_backup: Arc<WalBackup>,
|
||||
}
|
||||
@@ -60,12 +62,14 @@ impl GlobalTimelinesState {
|
||||
Arc<TimelinesSet>,
|
||||
RateLimiter,
|
||||
Arc<WalBackup>,
|
||||
Arc<wal_advertiser::GlobalState>,
|
||||
) {
|
||||
(
|
||||
self.conf.clone(),
|
||||
self.broker_active_set.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
self.wal_backup.clone(),
|
||||
self.wal_advertisement.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -101,6 +105,8 @@ impl GlobalTimelines {
|
||||
tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
wal_advertisement: Arc::new(wal_advertiser::GlobalState::default()),
|
||||
pageserver_connectivity: Arc::new(pageserver_connectivity::GlobalState::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
wal_backup,
|
||||
}),
|
||||
@@ -158,12 +164,13 @@ impl GlobalTimelines {
|
||||
/// just lock and unlock it for each timeline -- this function is called
|
||||
/// during init when nothing else is running, so this is fine.
|
||||
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
|
||||
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
|
||||
{
|
||||
@@ -187,6 +194,7 @@ impl GlobalTimelines {
|
||||
broker_active_set.clone(),
|
||||
partial_backup_rate_limiter.clone(),
|
||||
wal_backup.clone(),
|
||||
wal_advertiser.clone(),
|
||||
);
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
@@ -238,7 +246,7 @@ impl GlobalTimelines {
|
||||
start_lsn: Lsn,
|
||||
commit_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, _, _, _) = {
|
||||
let (conf, _, _, _, _) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
@@ -283,7 +291,7 @@ impl GlobalTimelines {
|
||||
check_tombstone: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup, wal_advertiser) = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
match state.timelines.get(&ttid) {
|
||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||
@@ -338,6 +346,7 @@ impl GlobalTimelines {
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
wal_backup,
|
||||
wal_advertiser.clone(),
|
||||
);
|
||||
drop(timeline_shared_state);
|
||||
Ok(timeline)
|
||||
@@ -590,6 +599,14 @@ impl GlobalTimelines {
|
||||
Ok(deleted)
|
||||
}
|
||||
|
||||
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::GlobalState> {
|
||||
self.state.lock().unwrap().wal_advertisement.clone()
|
||||
}
|
||||
|
||||
pub fn get_pageserver_connectivity(&self) -> Arc<pageserver_connectivity::GlobalState> {
|
||||
self.state.lock().unwrap().pageserver_connectivity.clone()
|
||||
}
|
||||
|
||||
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
|
||||
201
safekeeper/src/wal_advertiser.rs
Normal file
201
safekeeper/src/wal_advertiser.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
mod persistence;
|
||||
mod pageserver_connectivity;
|
||||
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use tracing::{Instrument, error, info, info_span, warn};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
sync::{spsc_fold, spsc_watch},
|
||||
};
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
type Advs = HashMap<TenantTimelineId, Lsn>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct GlobalState {
|
||||
inner: once_cell::sync::OnceCell<tokio::sync::mpsc::Sender<Message>>,
|
||||
}
|
||||
|
||||
pub struct SafekeeperTimelineHandle {
|
||||
tx: tokio::sync::mpsc::Sender<Message>,
|
||||
}
|
||||
|
||||
enum Message {
|
||||
NewTimeline {
|
||||
reply: tokio::sync::oneshot::Sender<Result<(), Error>>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub fn task_main(&self) -> impl 'static + Future<Output = anyhow::Result<()>> + Send {
|
||||
let mut ret = None;
|
||||
self.inner.get_or_init(|| {
|
||||
let (tx, task_fut) = MainTask::prepare_run();
|
||||
ret = Some(task_fut);
|
||||
tx
|
||||
});
|
||||
ret.expect("must only call this method once")
|
||||
}
|
||||
|
||||
pub async fn new_timeline(
|
||||
&self,
|
||||
tli: Arc<Timeline>,
|
||||
) -> Result<SafekeeperTimelineHandle, Error> {
|
||||
let tx = self.inner.get().unwrap().clone();
|
||||
let handle = SafekeeperTimelineHandle { tx };
|
||||
let (reply, rx) = tokio::sync::oneshot::channel();
|
||||
let Ok(()) = handle.tx.send(Message::NewTimeline { reply }).await else {
|
||||
return Err(Error::Cancelled);
|
||||
};
|
||||
let Ok(res) = rx.await else {
|
||||
return Err(Error::Cancelled);
|
||||
};
|
||||
Ok(handle)
|
||||
}
|
||||
pub fn update_pageserver_attachments(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
update: safekeeper_api::models::TenantShardPageserverAttachmentChange,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
impl SafekeeperTimelineHandle {
|
||||
pub fn ready_for_eviction(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
struct MainTask {
|
||||
rx: tokio::sync::mpsc::Receiver<Message>,
|
||||
world: sk_ps_discovery::World,
|
||||
senders: HashMap<utils::id::NodeId, spsc_watch::Sender<Advs>>,
|
||||
}
|
||||
|
||||
impl MainTask {
|
||||
fn prepare_run() -> (
|
||||
tokio::sync::mpsc::Sender<Message>,
|
||||
impl Future<Output = anyhow::Result<()>> + Send,
|
||||
) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(100 /* TODO think */);
|
||||
let task = MainTask {
|
||||
rx,
|
||||
world: sk_ps_discovery::World::default(),
|
||||
senders: Default::default(),
|
||||
};
|
||||
(tx, task.task())
|
||||
}
|
||||
async fn task(mut self) -> anyhow::Result<()> {
|
||||
let mut adv_frequency = tokio::time::interval(Duration::from_secs(1));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = adv_frequency.tick() => {
|
||||
let start = Instant::now();
|
||||
self.advertisements_iteration();
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed > Duration::from_millis(10) {
|
||||
warn!(?elapsed, "advertisements iteration is slow");
|
||||
}
|
||||
},
|
||||
message = self.rx.recv() => {
|
||||
match message {
|
||||
None => anyhow::bail!("last main task sender dropped, shouldn't happen, exiting"),
|
||||
Some(_) => todo!(),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn advertisements_iteration(&mut self) {
|
||||
loop {
|
||||
let advertisements = self.world.get_commit_lsn_advertisements();
|
||||
for (node_id, mut advs) in advertisements {
|
||||
'inner: loop {
|
||||
let tx = self.senders.entry(node_id).or_insert_with(|| {
|
||||
let (tx, rx) = spsc_watch::channel();
|
||||
tokio::spawn(
|
||||
PageserverTask {
|
||||
ps_id: node_id,
|
||||
endpoint: todo!(),
|
||||
advs: rx,
|
||||
}
|
||||
.run()
|
||||
.instrument(info_span!("wal_advertiser", ps_id=%node_id)),
|
||||
);
|
||||
tx
|
||||
});
|
||||
if let Err((failed, err)) = tx.send_replace(advs) {
|
||||
self.senders.remove(&node_id);
|
||||
advs = failed;
|
||||
} else {
|
||||
break 'inner;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
struct PageserverTask {
|
||||
ps_id: NodeId,
|
||||
advs: spsc_watch::Receiver<Advs>,
|
||||
}
|
||||
|
||||
impl PageserverTask {
|
||||
/// Cancellation: happens through last PageserverHandle being dropped.
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
let Ok(advs) = self.advs.recv().await else {
|
||||
info!("main task gone, exiting");
|
||||
return;
|
||||
};
|
||||
let res = self.run0(advs).await;
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
error!(?err, "error sending advertisements");
|
||||
// TODO: proper backoff?
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn run0(&mut self, advs: HashMap<TenantTimelineId, Lsn>) -> anyhow::Result<()> {
|
||||
use storage_broker::wal_advertisement as proto;
|
||||
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
|
||||
let stream = async_stream::stream! {
|
||||
for (tenant_timeline_id, commit_lsn) in advs {
|
||||
yield proto::CommitLsnAdvertisement {tenant_timeline_id: Some(proto::TenantTimelineId {
|
||||
tenant_id: tenant_timeline_id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: tenant_timeline_id.timeline_id.as_ref().to_owned(),
|
||||
}), commit_lsn: commit_lsn.0 };
|
||||
}
|
||||
};
|
||||
let mut client: PageserverClient<_> = PageserverClient::connect(self.endpoint.clone())
|
||||
.await
|
||||
.context("connect")?;
|
||||
let publish_stream = client
|
||||
.publish_commit_lsn_advertisements(stream)
|
||||
.await
|
||||
.context("publish stream")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
0
safekeeper/src/wal_advertiser/persistence.rs
Normal file
0
safekeeper/src/wal_advertiser/persistence.rs
Normal file
@@ -27,6 +27,7 @@ parking_lot.workspace = true
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tokio-util.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
|
||||
@@ -5,7 +5,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// easy location, but apparently interference with cachepot sometimes fails
|
||||
// the build then. Anyway, per cargo docs build script shouldn't output to
|
||||
// anywhere but $OUT_DIR.
|
||||
tonic_build::compile_protos("proto/broker.proto")
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
let protos = [
|
||||
"proto/broker.proto",
|
||||
"proto/wal_advertisement.proto",
|
||||
];
|
||||
for proto in protos {
|
||||
tonic_build::compile_protos(proto)
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -35,14 +35,14 @@ message SafekeeperTimelineInfo {
|
||||
// LSN of the last record.
|
||||
uint64 flush_lsn = 4;
|
||||
// Up to which LSN safekeeper regards its WAL as committed.
|
||||
uint64 commit_lsn = 5;
|
||||
uint64 commit_lsn = 5; // yes
|
||||
// LSN up to which safekeeper has backed WAL.
|
||||
uint64 backup_lsn = 6;
|
||||
// LSN of last checkpoint uploaded by pageserver.
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
uint64 standby_horizon = 14;
|
||||
uint64 standby_horizon = 14; // yes
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string.
|
||||
|
||||
29
storage_broker/proto/wal_advertisement.proto
Normal file
29
storage_broker/proto/wal_advertisement.proto
Normal file
@@ -0,0 +1,29 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "google/protobuf/empty.proto";
|
||||
|
||||
package wal_advertisement;
|
||||
|
||||
service Pageserver {
|
||||
rpc PublishCommitLsnAdvertisements(stream CommitLsnAdvertisement) returns (google.protobuf.Empty) {};
|
||||
rpc SubscribeRemoteConsistentLsnAdvertisements(google.protobuf.Empty) returns (stream RemoteConsistentLsnAdvertisement) {};
|
||||
}
|
||||
|
||||
message CommitLsnAdvertisement {
|
||||
TenantTimelineId tenant_timeline_id = 1;
|
||||
uint64 commit_lsn = 2;
|
||||
}
|
||||
|
||||
message RemoteConsistentLsnAdvertisement {
|
||||
bytes tenant_id = 1;
|
||||
uint32 shard_id = 2;
|
||||
bytes timeline_id = 3;
|
||||
uint64 generation = 4;
|
||||
uint64 remote_consistent_lsn = 5;
|
||||
}
|
||||
|
||||
message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::Stream;
|
||||
use proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use proto::broker_service_client::BrokerServiceClient;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::Status;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::transport::Endpoint;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
|
||||
};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
// Code generated by protobuf.
|
||||
@@ -16,12 +20,18 @@ pub mod proto {
|
||||
tonic::include_proto!("storage_broker");
|
||||
}
|
||||
|
||||
pub mod wal_advertisement {
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
tonic::include_proto!("wal_advertisement");
|
||||
}
|
||||
|
||||
pub mod metrics;
|
||||
|
||||
// Re-exports to avoid direct tonic dependency in user crates.
|
||||
pub use hyper::Uri;
|
||||
pub use tonic::transport::{Certificate, ClientTlsConfig};
|
||||
pub use tonic::{Code, Request, Streaming};
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
|
||||
@@ -29,9 +39,199 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
|
||||
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// BrokerServiceClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
#[derive(Clone)]
|
||||
pub struct TimelineUpdatesSubscriber {
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code.
|
||||
pub struct BrokerClientChannel {
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
impl BrokerClientChannel {
|
||||
pub fn into_raw_grpc_client(
|
||||
self,
|
||||
) -> proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel> {
|
||||
self.client
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimelineShardUpdate {
|
||||
pub is_discovery: bool,
|
||||
pub inner: proto::SafekeeperDiscoveryResponse,
|
||||
}
|
||||
|
||||
pub struct DiscoveryRequester {
|
||||
id: ProtoTenantTimelineId,
|
||||
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
|
||||
}
|
||||
|
||||
impl TimelineUpdatesSubscriber {
|
||||
pub fn new(service_client: BrokerClientChannel) -> Self {
|
||||
Self {
|
||||
client: service_client.client.clone(),
|
||||
}
|
||||
}
|
||||
pub fn subscribe(
|
||||
&mut self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> (impl Stream<Item = TimelineShardUpdate>, DiscoveryRequester) {
|
||||
let id = ProtoTenantTimelineId {
|
||||
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: timeline_id.as_ref().to_owned(),
|
||||
};
|
||||
let discovery_requester = DiscoveryRequester {
|
||||
id: id.clone(),
|
||||
client: self.client.clone(),
|
||||
};
|
||||
let stream = async_stream::stream! {
|
||||
let mut attempt = 0;
|
||||
'resubscribe: loop {
|
||||
exponential_backoff(
|
||||
attempt,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
|
||||
use proto::*;
|
||||
// subscribe to the specific timeline
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo as i32,
|
||||
},
|
||||
TypeSubscription {
|
||||
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
|
||||
},
|
||||
],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(id.clone()),
|
||||
}),
|
||||
};
|
||||
|
||||
let res = tokio::select! {
|
||||
r = self.client.subscribe_by_filter(request) => { r }
|
||||
_ = cancel.cancelled() => { return; }
|
||||
};
|
||||
let mut update_stream = match res
|
||||
{
|
||||
Ok(resp) => {
|
||||
resp.into_inner()
|
||||
}
|
||||
Err(e) => {
|
||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
|
||||
info!(
|
||||
attempt, "failed to subscribe: {e:#}"
|
||||
);
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
loop {
|
||||
let broker_update = tokio::select!{
|
||||
_ = cancel.cancelled() => {
|
||||
return;
|
||||
}
|
||||
update = update_stream.message() => { update }
|
||||
};
|
||||
match broker_update {
|
||||
Ok(Some(typed_msg)) => {
|
||||
let mut is_discovery = false;
|
||||
let timeline_update = match typed_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => {
|
||||
let info = match typed_msg.safekeeper_timeline_info {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_timeline_info");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
SafekeeperDiscoveryResponse {
|
||||
safekeeper_id: info.safekeeper_id,
|
||||
tenant_timeline_id: info.tenant_timeline_id,
|
||||
commit_lsn: info.commit_lsn,
|
||||
safekeeper_connstr: info.safekeeper_connstr,
|
||||
availability_zone: info.availability_zone,
|
||||
standby_horizon: info.standby_horizon,
|
||||
}
|
||||
}
|
||||
MessageType::SafekeeperDiscoveryResponse => {
|
||||
is_discovery = true;
|
||||
match typed_msg.safekeeper_discovery_response {
|
||||
Some(response) => response,
|
||||
None => {
|
||||
warn!("bad proto message from broker: no safekeeper_discovery_response");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// unexpected message
|
||||
warn!("unexpected message from broker: {typed_msg:?}");
|
||||
continue 'resubscribe;
|
||||
}
|
||||
};
|
||||
attempt = 0; // reset backoff iff we received a valid update
|
||||
yield TimelineShardUpdate{is_discovery, inner: timeline_update };
|
||||
},
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
|
||||
// tonic's error handling doesn't provide a clear code for disconnections: we get
|
||||
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
|
||||
// => https://github.com/neondatabase/neon/issues/9562
|
||||
info!("broker disconnected: {status}");
|
||||
},
|
||||
_ => {
|
||||
warn!("broker subscription failed: {status}");
|
||||
}
|
||||
}
|
||||
continue 'resubscribe;
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("broker subscription stream ended"); // can't happen
|
||||
continue 'resubscribe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
(stream, discovery_requester)
|
||||
}
|
||||
}
|
||||
|
||||
impl DiscoveryRequester {
|
||||
pub async fn request(&mut self) {
|
||||
let request = proto::SafekeeperDiscoveryRequest {
|
||||
tenant_timeline_id: Some(self.id.clone()),
|
||||
};
|
||||
let msg = proto::TypedMessage {
|
||||
r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32,
|
||||
safekeeper_timeline_info: None,
|
||||
safekeeper_discovery_request: Some(request),
|
||||
safekeeper_discovery_response: None,
|
||||
};
|
||||
|
||||
// Cancellation safety: we want to send a message to the broker, but publish_one()
|
||||
// function can get cancelled by the other select! arm. This is absolutely fine, because
|
||||
// we just want to receive broker updates and discovery is not important if we already
|
||||
// receive updates.
|
||||
//
|
||||
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
|
||||
// This is totally fine because of the reason above.
|
||||
|
||||
// This is a fire-and-forget request, we don't care about the response
|
||||
let _ = self.client.publish_one(msg).await;
|
||||
debug!("Discovery request sent to the broker");
|
||||
}
|
||||
}
|
||||
|
||||
// Create connection object configured to run TLS if schema starts with https://
|
||||
// and plain text otherwise. Connection is lazy, only endpoint sanity is
|
||||
@@ -67,19 +267,9 @@ where
|
||||
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
|
||||
// keep_alive_timeout is 20s by default on both client and server side
|
||||
let channel = tonic_endpoint.connect_lazy();
|
||||
Ok(BrokerClientChannel::new(channel))
|
||||
}
|
||||
|
||||
impl BrokerClientChannel {
|
||||
/// Create a new client to the given endpoint, but don't actually connect until the first request.
|
||||
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
Ok(BrokerClientChannel {
|
||||
client: proto::broker_service_client::BrokerServiceClient::new(channel),
|
||||
})
|
||||
}
|
||||
|
||||
// parse variable length bytes from protobuf
|
||||
|
||||
@@ -15,6 +15,7 @@ testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-stream.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
@@ -69,4 +70,4 @@ http-utils = { path = "../libs/http-utils/" }
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
DROP TRIGGER on_timelines_UPDATE_enqueue_sk_ps_discovery on "timelines";
|
||||
DROP FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP TRIGGER on_timelines_DELETE_enqueue_sk_ps_discovery on "timelines";
|
||||
DROP FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP TRIGGER on_timelines_INSERT_enqueue_sk_ps_discovery on "timelines";
|
||||
DROP FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery on "tenant_shards";
|
||||
DROP FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery on "tenant_shards";
|
||||
DROP FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery on "tenant_shards";
|
||||
DROP FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn;
|
||||
DROP FUNCTION IF EXISTS sk_ps_discovery_enqueue_attachment_create;
|
||||
DROP TABLE "sk_ps_discovery";
|
||||
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
CREATE TABLE "sk_ps_discovery"(
|
||||
"tenant_id" VARCHAR NOT NULL,
|
||||
"shard_number" INT4 NOT NULL,
|
||||
"shard_count" INT4 NOT NULL,
|
||||
"ps_generation" INT4 NOT NULL,
|
||||
"sk_id" INT8 NOT NULL REFERENCES "safekeepers"("id") ON DELETE CASCADE, -- more efficient that trigger on "safekeepers"
|
||||
"intent_state" VARCHAR NOT NULL, -- attached,detached
|
||||
"ps_id" INT8 NOT NULL REFERENCES "nodes"("node_id") ON DELETE CASCADE, -- more efficient that trigger on "nodes"
|
||||
"created_at" TIMESTAMPTZ NOT NULL,
|
||||
"retries" INT4 NOT NULL DEFAULT 0,
|
||||
"last_retry_at" TIMESTAMPTZ,
|
||||
"acknowledged_at" TIMESTAMPTZ,
|
||||
PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id")
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION sk_ps_discovery_enqueue_attachment_create(ARG_TENANT_ID VARCHAR)
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, intent_state, ps_id, created_at)
|
||||
WITH intent_attachments AS (
|
||||
SELECT DISTINCT tenant_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines
|
||||
WHERE
|
||||
tenant_id = ARG_TENANT_ID
|
||||
AND
|
||||
timelines.deleted_at IS NULL
|
||||
)
|
||||
SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count,
|
||||
tenant_shards.generation, intent_attachments.sk_id, 'attached', tenant_shards.generation_pageserver, NOW()
|
||||
FROM tenant_shards
|
||||
INNER JOIN intent_attachments ON tenant_shards.tenant_id = intent_attachments.tenant_id
|
||||
ON CONFLICT DO NOTHING; -- the first trigger creates the attachment, all others are identical because tenant shard generations are monotonic
|
||||
|
||||
PERFORM pg_notify('sk_ps_discovery', json_build_object(
|
||||
'tenant_id', ARG_TENANT_ID
|
||||
)::text);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Trigger on tenant_shards table
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery
|
||||
AFTER INSERT
|
||||
ON "tenant_shards"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_ps_tenant_shard_INSERT_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery
|
||||
AFTER DELETE
|
||||
ON "tenant_shards"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_ps_tenant_shard_DELETE_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery
|
||||
AFTER UPDATE
|
||||
ON "tenant_shards"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_ps_tenant_shard_UPDATE_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
-- Trigger on timelines table
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_timelines_INSERT_enqueue_sk_ps_discovery
|
||||
AFTER INSERT
|
||||
ON "timelines"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_timelines_INSERT_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(OLD.tenant_id);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_timelines_DELETE_enqueue_sk_ps_discovery
|
||||
AFTER DELETE
|
||||
ON "timelines"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_timelines_DELETE_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM sk_ps_discovery_enqueue_attachment_create(NEW.tenant_id);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE TRIGGER on_timelines_UPDATE_enqueue_sk_ps_discovery
|
||||
AFTER UPDATE
|
||||
ON "timelines"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION on_timelines_UPDATE_enqueue_sk_ps_discovery_triggerfn();
|
||||
|
||||
@@ -436,7 +436,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
|
||||
Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into())
|
||||
.await?;
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
pub(crate) mod split_state;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::ops::Add;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -15,8 +17,8 @@ use diesel_async::pooled_connection::bb8::Pool;
|
||||
use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
|
||||
use diesel_async::{AsyncPgConnection, RunQueryDsl};
|
||||
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
|
||||
@@ -31,6 +33,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
|
||||
use rustls::crypto::ring;
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -74,6 +77,8 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
|
||||
pub struct Persistence {
|
||||
connection_pool: Pool<AsyncPgConnection>,
|
||||
connect_tokio_postgres:
|
||||
Box<dyn Sync + Send + 'static + Fn() -> BoxFuture<'static, TokioPostgresConnectResult>>,
|
||||
}
|
||||
|
||||
/// Legacy format, for use in JSON compat objects in test environment
|
||||
@@ -135,6 +140,8 @@ pub(crate) enum DatabaseOperation {
|
||||
DeleteTimelineImport,
|
||||
ListTimelineImports,
|
||||
IsTenantImportingTimeline,
|
||||
ListSkPsDiscovery,
|
||||
UpdateSkPsDiscoveryAttempt,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -177,10 +184,11 @@ impl Persistence {
|
||||
|
||||
pub async fn new(database_url: String) -> Self {
|
||||
let mut mgr_config = ManagerConfig::default();
|
||||
mgr_config.custom_setup = Box::new(establish_connection_rustls);
|
||||
mgr_config.custom_setup =
|
||||
Box::new(|config| establish_connection_rustls_diesel(config.to_owned()));
|
||||
|
||||
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(
|
||||
database_url,
|
||||
database_url.clone(),
|
||||
mgr_config,
|
||||
);
|
||||
|
||||
@@ -197,20 +205,25 @@ impl Persistence {
|
||||
.await
|
||||
.expect("Could not build connection pool");
|
||||
|
||||
Self { connection_pool }
|
||||
Self {
|
||||
connection_pool,
|
||||
connect_tokio_postgres: Box::new(move || {
|
||||
establish_connection_rustls_tokio_postgres(database_url.clone())
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper for use during startup, where we would like to tolerate concurrent restarts of the
|
||||
/// database and the storage controller, therefore the database might not be available right away
|
||||
pub async fn await_connection(
|
||||
database_url: &str,
|
||||
database_url: String,
|
||||
timeout: Duration,
|
||||
) -> Result<(), diesel::ConnectionError> {
|
||||
let started_at = Instant::now();
|
||||
log_postgres_connstr_info(database_url)
|
||||
log_postgres_connstr_info(&database_url)
|
||||
.map_err(|e| diesel::ConnectionError::InvalidConnectionUrl(e.to_string()))?;
|
||||
loop {
|
||||
match establish_connection_rustls(database_url).await {
|
||||
match establish_connection_rustls_diesel(database_url.clone()).await {
|
||||
Ok(_) => {
|
||||
tracing::info!("Connected to database.");
|
||||
return Ok(());
|
||||
@@ -1821,6 +1834,151 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn listen_sk_ps_discovery(
|
||||
&self,
|
||||
) -> DatabaseResult<
|
||||
Pin<Box<dyn Send + 'static + futures::Stream<Item = Result<TenantId, serde_json::Error>>>>,
|
||||
> {
|
||||
let (client, mut conn) = (&self.connect_tokio_postgres)().await?;
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
tokio::spawn(async move {
|
||||
let mut stream = futures::stream::poll_fn(move |cx| conn.poll_message(cx));
|
||||
while let Some(msg) = stream.next().await {
|
||||
info!(?msg, "async message");
|
||||
match msg {
|
||||
Ok(tokio_postgres::AsyncMessage::Notification(notification))
|
||||
if notification.channel() == "sk_ps_discovery" =>
|
||||
{
|
||||
let Ok(()) = tx.send(notification).await else {
|
||||
tracing::info!(
|
||||
"sk_ps_discovery notification rx dropped, stopping async notification processing"
|
||||
);
|
||||
break;
|
||||
};
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "tokio_postgres poll_message error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!("sk_ps_discovery notification stream returned None, exiting");
|
||||
});
|
||||
|
||||
client
|
||||
.batch_execute("LISTEN sk_ps_discovery;")
|
||||
.await
|
||||
.expect("TODO");
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Notification {
|
||||
tenant_id: TenantId,
|
||||
}
|
||||
Ok(Box::pin(async_stream::stream! {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let msg: Result<Notification, _> = serde_json::from_str(msg.payload());
|
||||
let msg = msg.map(|Notification { tenant_id }| tenant_id );
|
||||
yield msg;
|
||||
}
|
||||
tracing::info!("sk_ps_discovery channel closed, stopping stream");
|
||||
// keep client alive inside the returned sream object, othrwise `conn` ends as soon as we return from this function
|
||||
drop(client);
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_sk_ps_discovery_work(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<SkPsDiscoveryPersistence>> {
|
||||
use crate::schema::sk_ps_discovery::dsl;
|
||||
self.with_measured_conn(DatabaseOperation::ListSkPsDiscovery, move |conn| {
|
||||
Box::pin(async move {
|
||||
let vec: Vec<SkPsDiscoveryPersistence> = dsl::sk_ps_discovery.load(conn).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_sk_ps_discovery_attempt(
|
||||
&self,
|
||||
pk: SkPsDiscoveryPersistencePk,
|
||||
intent_state: String,
|
||||
update: Result<(), ()>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::sk_ps_discovery::dsl;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| {
|
||||
let pk = pk.clone();
|
||||
let intent_state = intent_state.clone();
|
||||
Box::pin(async move {
|
||||
match update {
|
||||
Ok(()) => {
|
||||
let SkPsDiscoveryPersistencePk {
|
||||
tenant_id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
ps_generation,
|
||||
sk_id,
|
||||
} = pk;
|
||||
let nrows = diesel::delete(dsl::sk_ps_discovery)
|
||||
// primary key
|
||||
.filter(dsl::tenant_id.eq(tenant_id))
|
||||
.filter(dsl::shard_number.eq(shard_number))
|
||||
.filter(dsl::shard_count.eq(shard_count))
|
||||
.filter(dsl::ps_generation.eq(ps_generation))
|
||||
.filter(dsl::sk_id.eq(sk_id))
|
||||
// intent_state could have changed beneath us (split brain or concurrent state gc)
|
||||
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
|
||||
.filter(dsl::intent_state.eq(intent_state))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
if nrows != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of deletes: {nrows}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
let SkPsDiscoveryPersistencePk {
|
||||
tenant_id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
ps_generation,
|
||||
sk_id,
|
||||
} = pk;
|
||||
|
||||
let nrows = diesel::update(dsl::sk_ps_discovery)
|
||||
// primary key
|
||||
.filter(dsl::tenant_id.eq(tenant_id))
|
||||
.filter(dsl::shard_number.eq(shard_number))
|
||||
.filter(dsl::shard_count.eq(shard_count))
|
||||
.filter(dsl::ps_generation.eq(ps_generation))
|
||||
.filter(dsl::sk_id.eq(sk_id))
|
||||
// intent_state could have changed beneath us (split brain or concurrent state gc)
|
||||
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
|
||||
.filter(dsl::intent_state.eq(intent_state))
|
||||
// action:
|
||||
.set((
|
||||
dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice...
|
||||
dsl::last_retry_at.eq(diesel::dsl::now),
|
||||
))
|
||||
.execute(conn) // TODO: check update count?
|
||||
.await?;
|
||||
if nrows != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of updates: {nrows}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
@@ -1909,21 +2067,40 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
})
|
||||
}
|
||||
|
||||
fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
|
||||
let fut = async {
|
||||
type TokioPostgresConnectResult = ConnectionResult<(
|
||||
tokio_postgres::Client,
|
||||
tokio_postgres::Connection<
|
||||
tokio_postgres::Socket,
|
||||
tokio_postgres_rustls::RustlsStream<tokio_postgres::Socket>,
|
||||
>,
|
||||
)>;
|
||||
|
||||
fn establish_connection_rustls_tokio_postgres(
|
||||
config: String,
|
||||
) -> BoxFuture<'static, TokioPostgresConnectResult> {
|
||||
let fut = async move {
|
||||
// We first set up the way we want rustls to work.
|
||||
let rustls_config = client_config_with_root_certs()
|
||||
.map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?;
|
||||
let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config);
|
||||
let (client, conn) = tokio_postgres::connect(config, tls)
|
||||
let (client, conn) = tokio_postgres::connect(&config, tls)
|
||||
.await
|
||||
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
|
||||
|
||||
AsyncPgConnection::try_from_client_and_connection(client, conn).await
|
||||
Ok((client, conn))
|
||||
};
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
fn establish_connection_rustls_diesel(
|
||||
config: String,
|
||||
) -> BoxFuture<'static, ConnectionResult<AsyncPgConnection>> {
|
||||
async {
|
||||
let (client, conn) = establish_connection_rustls_tokio_postgres(config).await?;
|
||||
AsyncPgConnection::try_from_client_and_connection(client, conn).await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
#[cfg_attr(test, test)]
|
||||
fn test_config_debug_censors_password() {
|
||||
let has_pw =
|
||||
@@ -2386,3 +2563,61 @@ pub(crate) struct TimelineImportPersistence {
|
||||
pub(crate) timeline_id: String,
|
||||
pub(crate) shard_statuses: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset, Selectable, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
#[diesel(table_name = crate::schema::sk_ps_discovery)]
|
||||
pub(crate) struct SkPsDiscoveryPersistencePk {
|
||||
pub(crate) tenant_id: String,
|
||||
pub(crate) shard_number: i32,
|
||||
pub(crate) shard_count: i32,
|
||||
pub(crate) ps_generation: i32,
|
||||
pub(crate) sk_id: i64,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Selectable, Clone, PartialEq, Eq)]
|
||||
#[diesel(table_name = crate::schema::sk_ps_discovery)]
|
||||
pub(crate) struct SkPsDiscoveryPersistence {
|
||||
pub(crate) tenant_id: String,
|
||||
pub(crate) shard_number: i32,
|
||||
pub(crate) shard_count: i32,
|
||||
pub(crate) ps_generation: i32,
|
||||
pub(crate) sk_id: i64,
|
||||
pub(crate) intent_state: String,
|
||||
pub(crate) ps_id: i64,
|
||||
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
|
||||
pub(crate) retries: i32,
|
||||
pub(crate) last_retry_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub(crate) acknowledged_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
impl SkPsDiscoveryPersistence {
|
||||
pub(crate) fn tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||
Ok(TenantShardId {
|
||||
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(self.shard_number as u8),
|
||||
shard_count: ShardCount::new(self.shard_count as u8),
|
||||
})
|
||||
}
|
||||
pub(crate) fn primary_key(&self) -> SkPsDiscoveryPersistencePk {
|
||||
let SkPsDiscoveryPersistence {
|
||||
tenant_id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
ps_generation,
|
||||
sk_id,
|
||||
intent_state: _,
|
||||
ps_id: _,
|
||||
created_at: _,
|
||||
retries: _,
|
||||
last_retry_at: _,
|
||||
acknowledged_at: _,
|
||||
} = self;
|
||||
SkPsDiscoveryPersistencePk {
|
||||
tenant_id: tenant_id.clone(),
|
||||
shard_number: *shard_number,
|
||||
shard_count: *shard_count,
|
||||
ps_generation: *ps_generation,
|
||||
sk_id: *sk_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use safekeeper_api::models::{
|
||||
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
|
||||
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
|
||||
TenantShardPageserverAttachmentChange, TimelineCreateRequest,
|
||||
};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::logging::SecretString;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
|
||||
@@ -164,4 +166,19 @@ impl SafekeeperClient {
|
||||
self.inner.utilization().await
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn post_tenant_shard_pageserver_attachments(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
body: TenantShardPageserverAttachmentChange,
|
||||
) -> Result<()> {
|
||||
measured_request!(
|
||||
"post_tenant_shard_pageserver_attachments",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
|
||||
.await
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,22 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id) {
|
||||
tenant_id -> Varchar,
|
||||
shard_number -> Int4,
|
||||
shard_count -> Int4,
|
||||
ps_generation -> Int4,
|
||||
sk_id -> Int8,
|
||||
intent_state -> Varchar,
|
||||
ps_id -> Int8,
|
||||
created_at -> Timestamptz,
|
||||
retries -> Int4,
|
||||
last_retry_at -> Nullable<Timestamptz>,
|
||||
acknowledged_at -> Nullable<Timestamptz>,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
tenant_shards (tenant_id, shard_number, shard_count) {
|
||||
tenant_id -> Varchar,
|
||||
@@ -100,12 +116,16 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::joinable!(sk_ps_discovery -> nodes (ps_id));
|
||||
diesel::joinable!(sk_ps_discovery -> safekeepers (sk_id));
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(
|
||||
controllers,
|
||||
metadata_health,
|
||||
nodes,
|
||||
safekeeper_timeline_pending_ops,
|
||||
safekeepers,
|
||||
sk_ps_discovery,
|
||||
tenant_shards,
|
||||
timeline_imports,
|
||||
timelines,
|
||||
|
||||
@@ -2,6 +2,7 @@ pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
pub(crate) mod safekeeper_reconciler;
|
||||
mod safekeeper_service;
|
||||
mod sk_ps_discovery;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
@@ -1192,6 +1193,16 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn run_sk_ps_discovery(self: &Arc<Self>) {
|
||||
self.startup_complete.clone().wait().await;
|
||||
sk_ps_discovery::run(
|
||||
self.clone(),
|
||||
self.http_client.clone(), /* TODO this client is configured to openf resh TCP connection each time, very inefficient */
|
||||
).await;
|
||||
}
|
||||
|
||||
/// Heartbeat all storage nodes once in a while.
|
||||
#[instrument(skip_all)]
|
||||
async fn spawn_heartbeat_driver(&self) {
|
||||
@@ -1797,7 +1808,7 @@ impl Service {
|
||||
reconcilers_gate: Gate::default(),
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
http_client,
|
||||
http_client: http_client.clone(),
|
||||
step_down_barrier: Default::default(),
|
||||
});
|
||||
|
||||
@@ -1865,6 +1876,15 @@ impl Service {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
let startup_complete = startup_complete.clone();
|
||||
async move {
|
||||
startup_complete.wait().await;
|
||||
this.run_sk_ps_discovery().await
|
||||
}
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
let startup_complete = startup_complete.clone();
|
||||
|
||||
@@ -647,6 +647,11 @@ impl Service {
|
||||
sk.describe_response()
|
||||
}
|
||||
|
||||
pub(crate) fn get_safekeeper_object(&self, node_id: i64) -> Option<Safekeeper> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.get(&NodeId(node_id as u64)).cloned()
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
self: &Arc<Service>,
|
||||
record: crate::persistence::SafekeeperUpsert,
|
||||
|
||||
266
storage_controller/src/service/sk_ps_discovery.rs
Normal file
266
storage_controller/src/service/sk_ps_discovery.rs
Normal file
@@ -0,0 +1,266 @@
|
||||
use std::{
|
||||
collections::{HashMap, hash_map},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::{StreamExt, stream::FuturesUnordered};
|
||||
use safekeeper_api::models::{
|
||||
TenantShardPageserverAttachment, TenantShardPageserverAttachmentChange,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, Span, error, info, info_span};
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{NodeId, TenantId},
|
||||
logging::SecretString,
|
||||
shard::ShardIndex,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
heartbeater::SafekeeperState,
|
||||
persistence::{Persistence, SkPsDiscoveryPersistence},
|
||||
};
|
||||
|
||||
use super::Service;
|
||||
|
||||
struct Actor {
|
||||
service: Arc<Service>,
|
||||
persistence: Arc<Persistence>,
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
pub async fn run(service: Arc<Service>, http_client: reqwest::Client) {
|
||||
let actor = Actor {
|
||||
persistence: service.persistence.clone(),
|
||||
service,
|
||||
http_client, // XXX: build our own client instead of getting Service's client; we probably want idle conn to each sk
|
||||
};
|
||||
actor.run().await;
|
||||
}
|
||||
|
||||
impl Actor {
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
match self.run0().await {
|
||||
Ok(()) => {
|
||||
info!("sk_ps_discovery actor exiting after shutdown signal observed");
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
?err,
|
||||
"sk_ps_discovery actor encountered an error, restarting after backoff"
|
||||
);
|
||||
// TODO: proper backoff
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run0(&mut self) -> anyhow::Result<()> {
|
||||
let mut subscription = self
|
||||
.persistence
|
||||
.listen_sk_ps_discovery()
|
||||
.await
|
||||
.context("listen to sk_ps_discovery")?;
|
||||
|
||||
let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5));
|
||||
|
||||
struct Task {
|
||||
work: SkPsDiscoveryPersistence,
|
||||
cancel: CancellationToken,
|
||||
join_handle: Option<JoinHandle<()>>,
|
||||
}
|
||||
let mut tasks = HashMap::new();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased; // control messages have higher priority, the periodic full tick, then subscriptions.
|
||||
_ = sync_full_ticker.tick() => {
|
||||
info!("rebuild");
|
||||
}
|
||||
maybe_res = subscription.next() => {
|
||||
match maybe_res {
|
||||
None => {
|
||||
anyhow::bail!("subscription should never end");
|
||||
}
|
||||
Some(Ok(tenant_id)) => {
|
||||
let tenant_id: TenantId = tenant_id;
|
||||
info!(?tenant_id, "notify for tenant_id");
|
||||
// for now, just also rebuild everything
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
let err: serde_json::Error = err;
|
||||
anyhow::bail!("incorrect notification format: {err:?}"); // FIXME repeat message in error so it can be debugged ?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// get list of tasks from database
|
||||
let mut new_tasks = self
|
||||
.persistence
|
||||
.get_all_sk_ps_discovery_work()
|
||||
.await
|
||||
.context("get_all_sk_ps_discovery_work")?
|
||||
.into_iter()
|
||||
.map(|work: SkPsDiscoveryPersistence| {
|
||||
(
|
||||
work.primary_key(),
|
||||
Task {
|
||||
work,
|
||||
cancel: CancellationToken::new(),
|
||||
join_handle: None,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
// Carry over ongoing tasks
|
||||
let mut cancelled_wait = FuturesUnordered::new();
|
||||
for (
|
||||
task_key,
|
||||
Task {
|
||||
work: ongoing_persistence,
|
||||
cancel,
|
||||
join_handle,
|
||||
},
|
||||
) in tasks.drain()
|
||||
{
|
||||
match new_tasks.entry(task_key) {
|
||||
hash_map::Entry::Occupied(mut planned) => {
|
||||
let Task {
|
||||
work: planned_persistence,
|
||||
cancel: planned_cancel,
|
||||
join_handle: planned_jh,
|
||||
} = planned.get_mut();
|
||||
assert!(planned_jh.is_none());
|
||||
if *planned_persistence == ongoing_persistence {
|
||||
*planned_jh = join_handle;
|
||||
*planned_cancel = cancel;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(_) => (),
|
||||
}
|
||||
cancel.cancel();
|
||||
cancelled_wait.push(async move {
|
||||
if let Some(jh) = join_handle {
|
||||
let _ = jh.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(_) = cancelled_wait.next().await {}
|
||||
tasks = new_tasks;
|
||||
|
||||
// Kick off new tasks
|
||||
for (key, task) in tasks.iter_mut() {
|
||||
if task.join_handle.is_none() {
|
||||
task.join_handle = Some(tokio::spawn(
|
||||
DeliveryAttempt {
|
||||
cancel: task.cancel.clone(),
|
||||
persistence: self.persistence.clone(),
|
||||
service: self.service.clone(),
|
||||
http_client: self.http_client.clone(),
|
||||
work: task.work.clone(),
|
||||
}
|
||||
.run()
|
||||
.instrument({
|
||||
let span = info_span!(parent: None, "sk_ps_discovery_delivery", ?key);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct DeliveryAttempt {
|
||||
cancel: CancellationToken,
|
||||
persistence: Arc<Persistence>,
|
||||
service: Arc<super::Service>,
|
||||
http_client: reqwest::Client,
|
||||
work: SkPsDiscoveryPersistence,
|
||||
}
|
||||
|
||||
impl DeliveryAttempt {
|
||||
pub async fn run(self) {
|
||||
let res = self.run0().await;
|
||||
if self.cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
if let Err(ref err) = res {
|
||||
error!(?err, "attempt failed");
|
||||
}
|
||||
let res = self
|
||||
.persistence
|
||||
.update_sk_ps_discovery_attempt(
|
||||
self.work.primary_key(),
|
||||
self.work.intent_state.clone(),
|
||||
res.map_err(|_| ()),
|
||||
)
|
||||
.await;
|
||||
if let Err(ref err) = res {
|
||||
error!(?err, "persistence of attempt result failed");
|
||||
}
|
||||
}
|
||||
async fn run0(&self) -> anyhow::Result<()> {
|
||||
let Some(sk) = self.service.get_safekeeper_object(self.work.sk_id) else {
|
||||
anyhow::bail!("safekeeper object does not exist");
|
||||
};
|
||||
|
||||
match sk.availability() {
|
||||
SafekeeperState::Available { .. } => (),
|
||||
SafekeeperState::Offline => {
|
||||
anyhow::bail!("safekeeper is offline");
|
||||
}
|
||||
}
|
||||
|
||||
let body = {
|
||||
let val = TenantShardPageserverAttachment {
|
||||
shard_id: ShardIndex {
|
||||
shard_number: utils::shard::ShardNumber(self.work.shard_number as u8),
|
||||
shard_count: utils::shard::ShardCount(self.work.shard_count as u8),
|
||||
},
|
||||
ps_id: NodeId(self.work.ps_id as u64),
|
||||
generation: Generation::new(self.work.ps_generation as u32),
|
||||
};
|
||||
match self.work.intent_state.as_str() {
|
||||
"attached" => TenantShardPageserverAttachmentChange::Attach { field1: val },
|
||||
"detached" => TenantShardPageserverAttachmentChange::Detach(val),
|
||||
x => anyhow::bail!("unknown intent state {x:?}"),
|
||||
}
|
||||
};
|
||||
let tenant_shard_id = self.work.tenant_shard_id()?;
|
||||
sk.with_client_retries(
|
||||
|client| {
|
||||
let body = body.clone();
|
||||
async move {
|
||||
client
|
||||
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
|
||||
.await
|
||||
}
|
||||
},
|
||||
&self.http_client,
|
||||
&self
|
||||
.service
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from),
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
&self.cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user