From 257693e4f223922c21fb7aea971d3fd4a6fc7a23 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 5 May 2025 10:59:45 +0200 Subject: [PATCH] WIP --- libs/safekeeper_api/src/models.rs | 14 +++++ safekeeper/src/http/routes.rs | 51 ++++++++++++++++- safekeeper/src/lib.rs | 2 + safekeeper/src/send_wal.rs | 4 +- safekeeper/src/tenant.rs | 42 ++++++++++++++ safekeeper/src/timeline.rs | 16 +++++- safekeeper/src/timelines_global_map.rs | 44 +++++++++++++- safekeeper/src/wal_advertiser.rs | 76 +++++++++++++++++++++++++ safekeeper/src/wal_advertiser/advmap.rs | 26 +++++++++ 9 files changed, 268 insertions(+), 7 deletions(-) create mode 100644 safekeeper/src/tenant.rs create mode 100644 safekeeper/src/wal_advertiser.rs create mode 100644 safekeeper/src/wal_advertiser/advmap.rs diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index cc31b38fe7..0d4f2cd82c 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity; use postgres_ffi::TimestampTz; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use utils::generation::Generation; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; +use utils::shard::TenantShardId; use crate::membership::Configuration; use crate::{ServerInfo, Term}; @@ -308,3 +310,15 @@ pub struct PullTimelineResponse { pub safekeeper_host: Option, // TODO: add more fields? } + +#[derive(Debug, Serialize, Deserialize)] +pub struct PutTenantPageserverLocationRequest { + pub pageserver_locations: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TenantShardPageserverLocation { + pub tenant_shard_id: TenantShardId, + pub generation: Generation, + pub pageserver_node_id: NodeId, +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 2b2d721db2..055ceeeef3 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -4,6 +4,7 @@ use std::io::Write as _; use std::str::FromStr; use std::sync::Arc; +use desim::node_os; use http_utils::endpoint::{ self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler, prometheus_metrics_handler, request_span, @@ -17,9 +18,10 @@ use hyper::{Body, Request, Response, StatusCode}; use pem::Pem; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::{ - AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, - TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, - TimelineStatus, TimelineTermBumpRequest, + AcceptorStateStatus, PullTimelineRequest, PutTenantPageserverLocationRequest, SafekeeperStatus, + SkTimelineInfo, TenantDeleteResult, TenantShardPageserverLocation, TermSwitchApiEntry, + TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, + TimelineTermBumpRequest, }; use safekeeper_api::{ServerInfo, membership, models}; use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId}; @@ -31,12 +33,14 @@ use tracing::{Instrument, info_span}; use utils::auth::SwappableJwtAuth; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use crate::debug_dump::TimelineDigestRequest; use crate::safekeeper::TermLsn; use crate::timelines_global_map::DeleteOrExclude; use crate::{ GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline, + wal_advertiser, }; /// Healthcheck handler. @@ -91,6 +95,41 @@ async fn tenant_delete_handler(mut request: Request) -> Result, +) -> Result, ApiError> { + let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_id))?; + + let PutTenantPageserverLocationRequest { + pageserver_locations, + }: PutTenantPageserverLocationRequest = json_request(&mut request).await?; + + let global_timelines = get_global_timelines(&request); + global_timelines + .update_tenant_locations(tenant_id, pageserver_locations) + .await?; + + json_response(StatusCode::OK, response_body) +} + + +async fn tenant_get_pageserver_locations_handler( + mut request: Request, +) -> Result, ApiError> { + + let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_id))?; + + let global_timelines = get_global_timelines(&request); + let locations = global_timelines + .get_tenant_locations(tenant_id) + .await?; + + json_response(StatusCode::OK, locations) + +} + async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { let request_data: TimelineCreateRequest = json_request(&mut request).await?; @@ -714,6 +753,12 @@ pub fn make_router( .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) + .put("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| { + request_span(r, tenant_put_pageserver_locations_handler) + }) + .get("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| { + request_span(r, tenant_get_pageserver_locations_handler) + }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { request_span(r, timeline_create_handler) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index ef2608e5d6..62bd57dfc8 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -43,6 +43,8 @@ pub mod wal_backup_partial; pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; +pub mod tenant; +pub mod wal_advertiser; #[cfg(any(test, feature = "benchmarking"))] pub mod test_utils; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 33e3d0485c..26ceabd2e9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -661,6 +661,7 @@ impl SafekeeperPostgresHandler { reader, ws_guard: ws_guard.clone(), tli, + shard: self.shard.clone(), }; let res = tokio::select! { @@ -977,6 +978,7 @@ struct ReplyReader { reader: PostgresBackendReader, ws_guard: Arc, tli: WalResidentTimeline, + shard: Option, } impl ReplyReader { @@ -1021,7 +1023,7 @@ impl ReplyReader { .walsenders .record_ps_feedback(self.ws_guard.id, &ps_feedback); self.tli - .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn) + .update_remote_consistent_lsn(shard, ps_feedback.remote_consistent_lsn) .await; // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through diff --git a/safekeeper/src/tenant.rs b/safekeeper/src/tenant.rs new file mode 100644 index 0000000000..3e68dc6702 --- /dev/null +++ b/safekeeper/src/tenant.rs @@ -0,0 +1,42 @@ +use safekeeper_api::models::TenantShardPageserverLocation; +use utils::shard::TenantShardId; +use walproposer::bindings::Generation; + +pub(crate) struct Tenant { + inner: Arc>, +} + +struct Inner { + data: HashMap, +} + +struct PerLocationState { + remote_consistent_lsn: Lsn, + last_update_at: Instant, + dirty: bool, +} + +impl Tenant { + pub fn load(conf: Arc, tenant_id: TenantId) -> anyhow::Result> { + todo!() + } + + pub fn get_locations(&self) -> anyhow::Result> { + todo!() + } + pub fn update_locations( + &self, + locations: Vec, + ) -> anyhow::Result<()> { + todo!() + } + pub fn update_remote_consistent_lsn( + &self, + tenant_shard_id: TenantShardId, + generation: Generation, + timeline_id: TimelineId, + lsn: Lsn, + ) -> anyhow::Result<()> { + todo!() + } +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index b7ba28f435..c3c07bef78 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantId, TenantTimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use utils::sync::gate::Gate; use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics}; @@ -32,6 +33,7 @@ use crate::receive_wal::WalReceivers; use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues}; use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState}; +use crate::tenant::Tenant; use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; @@ -425,6 +427,7 @@ type RemoteDeletionReceiver = tokio::sync::watch::Receiver, pub remote_path: RemotePath, /// Used to broadcast commit_lsn updates to all background jobs. @@ -475,6 +478,7 @@ impl Timeline { timeline_dir: &Utf8Path, remote_path: &RemotePath, shared_state: SharedState, + tenant: Arc, conf: Arc, ) -> Arc { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = @@ -489,6 +493,7 @@ impl Timeline { Arc::new(Self { ttid, + tenant, remote_path: remote_path.to_owned(), timeline_dir: timeline_dir.to_owned(), commit_lsn_watch_tx, @@ -515,6 +520,7 @@ impl Timeline { /// Load existing timeline from disk. pub fn load_timeline( conf: Arc, + tenant: Arc, ttid: TenantTimelineId, ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); @@ -528,6 +534,7 @@ impl Timeline { &timeline_dir, &remote_path, shared_state, + tenant, conf, )) } @@ -1062,12 +1069,19 @@ impl WalResidentTimeline { } /// Update in memory remote consistent lsn. - pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) { + pub async fn update_remote_consistent_lsn(&self, shard: Option, candidate: Lsn) { let mut shared_state = self.write_shared_state().await; shared_state.sk.state_mut().inmem.remote_consistent_lsn = max( shared_state.sk.state().inmem.remote_consistent_lsn, candidate, ); + drop(shared_state); + self.tli.tenant.update_remote_consistent_lsn( + todo!(), + todo!(), + todo!(), + + ) } } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 41abee369e..a626838f39 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -2,6 +2,7 @@ //! All timelines should always be present in this map, this is done by loading them //! all from the disk on startup and keeping them in memory. +use std::any; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -11,18 +12,22 @@ use anyhow::{Context, Result, bail}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use safekeeper_api::membership::Configuration; -use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult}; +use safekeeper_api::models::{ + SafekeeperUtilization, TenantShardPageserverLocation, TimelineDeleteResult, +}; use safekeeper_api::{ServerInfo, membership}; use tokio::fs; use tracing::*; use utils::crashsafe::{durable_rename, fsync_async_opt}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::TenantShardId; use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::http::routes::DeleteOrExcludeError; use crate::rate_limit::RateLimiter; use crate::state::TimelinePersistentState; +use crate::tenant::Tenant; use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir}; use crate::timelines_set::TimelinesSet; use crate::wal_storage::Storage; @@ -37,6 +42,7 @@ enum GlobalMapTimeline { } struct GlobalTimelinesState { + tenants: HashMap>, timelines: HashMap, // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent @@ -87,6 +93,7 @@ impl GlobalTimelines { pub fn new(conf: Arc) -> Self { Self { state: Mutex::new(GlobalTimelinesState { + tenants: HashMap::new(), timelines: HashMap::new(), tombstones: HashMap::new(), conf, @@ -153,6 +160,18 @@ impl GlobalTimelines { }; let timelines_dir = get_tenant_dir(&conf, &tenant_id); + + let tenant = match Tenant::load(conf.clone(), tenant_id) { + Ok(tenant) => { + let mut state = self.state.lock().unwrap(); + state.tenants.insert(tenant_id, Arc::clone(&tenant)); + tenant + } + Err(e) => { + todo!("should we fail the whole startup process?") + } + }; + for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir))? { @@ -162,7 +181,7 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline(conf.clone(), ttid) { + match Timeline::load_timeline(conf.clone(), tenant, ttid) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; self.state @@ -565,6 +584,27 @@ impl GlobalTimelines { Ok(deleted) } + pub async fn update_tenant_locations( + &self, + tenant_id: &TenantId, + locations: Vec, + ) -> anyhow::Result<()> { + let tenant = { + let mut state = self.state.lock().unwrap(); + state.tenants.get(tenant_id).context("tenant not found")? + }; + tenant.update_locations(locations).await? + } + + pub fn get_tenant_locations( + &self, + tenant_id: &TenantId, + ) -> anyhow::Result> { + let state = self.state.lock().unwrap(); + let tenant = state.tenants.get(tenant_id).context("tenant not found")?; + Ok(tenant.get_locations()) + } + pub fn housekeeping(&self, tombstone_ttl: &Duration) { let mut state = self.state.lock().unwrap(); diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs new file mode 100644 index 0000000000..e0321dcc38 --- /dev/null +++ b/safekeeper/src/wal_advertiser.rs @@ -0,0 +1,76 @@ +//! Advertise pending WAL to all pageservers that might be interested in it. + +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use desim::world::Node; + +use crate::{GlobalTimelines, SafeKeeperConf}; + +mod advmap; + +pub struct GlobalState { + pub resolver: NodeIdToHostportMap, +} + +#[derive(Default)] +pub struct NodeIdToHostportMap { + inner: Arc>>, +} + +impl NodeIdToHostportMap { + pub fn learn(&self, ) { + let mut inner = self.inner.lock().unwrap(); + inner.insert(node_id, hostport); + } +} + +pub(crate) async fn wal_advertiser_loop( + conf: Arc, + global_timelines: Arc, +) -> anyhow::Result<()> { + let active_timelines_set = global_timelines.get_global_broker_active_set(); + + let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); + + let outbound = async_stream::stream! { + loop { + // Note: we lock runtime here and in timeline methods as GlobalTimelines + // is under plain mutex. That's ok, all this code is not performance + // sensitive and there is no risk of deadlock as we don't await while + // lock is held. + let now = Instant::now(); + let all_tlis = active_timelines_set.get_all(); + let mut n_pushed_tlis = 0; + for tli in &all_tlis { + let sk_info = tli.get_safekeeper_info(&conf).await; + yield sk_info; + BROKER_PUSHED_UPDATES.inc(); + n_pushed_tlis += 1; + } + let elapsed = now.elapsed(); + + BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64()); + BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64); + + if elapsed > push_interval / 2 { + info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed); + } + + sleep(push_interval).await; + } + }; + client + .publish_safekeeper_info(Request::new(outbound)) + .await?; + Ok(()) +} + +pub(crate) async fn node_loop() { + + + loop { + + } + + +} diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs new file mode 100644 index 0000000000..611381cf54 --- /dev/null +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -0,0 +1,26 @@ +//! The data structure that track advertisement state. + +use std::sync::{Arc, RwLock}; + +pub struct World { + pageservers: RwLock>>, +} + +pub struct Pageserver { + node_id: NodeId, + attachments: RwLock>>, +} + +pub struct Attachment { + pageserver: NodeId, + tenant_shard_id: TenantShardId, + generation: Generation, + remote_consistent_lsn: RwLock>>, +} + +pub struct Timeline { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + remote_consistent_lsn: RwLock, +} +