diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 0d4f2cd82c..c37c5cbd49 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -318,7 +318,6 @@ pub struct PutTenantPageserverLocationRequest { #[derive(Debug, Serialize, Deserialize)] pub struct TenantShardPageserverLocation { - pub tenant_shard_id: TenantShardId, pub generation: Generation, pub pageserver_node_id: NodeId, } diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index 4167839e28..d5e332750c 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -46,6 +46,8 @@ pub struct ConnectionConfigArgs<'a> { pub auth_token: Option<&'a str>, pub availability_zone: Option<&'a str>, + + pub pageserver_generation: Option, } impl<'a> ConnectionConfigArgs<'a> { @@ -72,6 +74,10 @@ impl<'a> ConnectionConfigArgs<'a> { )); } + if let Some(pageserver_generation) = self.pageserver_generation { + options.push(format!("pageserver_generation={pageserver_generation}")); + } + options } } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 51f6d686f8..7f86f45393 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -879,7 +879,9 @@ impl ConnectionManagerState { shard_stripe_size, listen_pg_addr_str: info.safekeeper_connstr.as_ref(), auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), - availability_zone: self.conf.availability_zone.as_deref() + availability_zone: self.conf.availability_zone.as_deref(), + // TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode. + pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")), }; match wal_stream_connection_config(connection_conf_args) { diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index c267a55cb6..0fec0f48aa 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -24,7 +24,7 @@ use safekeeper::defaults::{ }; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, - WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service, + WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_backup, wal_service, }; use sd_notify::NotifyState; use storage_broker::{DEFAULT_ENDPOINT, Uri}; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index b54bee8bfb..fd3d762903 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -18,6 +18,7 @@ use safekeeper_api::models::ConnectionId; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{Instrument, debug, info, info_span}; use utils::auth::{Claims, JwtAuth, Scope}; +use utils::generation::{self, Generation}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::postgres_client::PostgresClientProtocol; @@ -37,6 +38,7 @@ pub struct SafekeeperPostgresHandler { pub timeline_id: Option, pub ttid: TenantTimelineId, pub shard: Option, + pub pageserver_generation: Option, pub protocol: Option, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, @@ -159,6 +161,7 @@ impl postgres_backend::Handler let mut shard_count: Option = None; let mut shard_number: Option = None; let mut shard_stripe_size: Option = None; + let mut pageserver_generation: Option = None; for opt in options { // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy, @@ -201,6 +204,12 @@ impl postgres_backend::Handler format!("Failed to parse {value} as shard stripe size") })?); } + Some(("pageserver_generation", value)) => { + self.pageserver_generation = + Some(value.parse::().map(Generation::new).with_context( + || format!("Failed to parse {value} as generation"), + )?); + } _ => continue, } } @@ -259,6 +268,12 @@ impl postgres_backend::Handler tracing::Span::current().record("shard", tracing::field::display(slug)); } } + if let Some(pageserver_generation) = self.pageserver_generation { + tracing::Span::current().record( + "pageserver_generation", + tracing::field::display(pageserver_generation.get_suffix()), + ); + } Ok(()) } else { @@ -370,6 +385,7 @@ impl SafekeeperPostgresHandler { timeline_id: None, ttid: TenantTimelineId::empty(), shard: None, + pageserver_generation: None, protocol: None, conn_id, claims: None, diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 0003310763..acfb910787 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -5,7 +5,7 @@ pub use routes::make_router; pub use safekeeper_api::models; use tokio_util::sync::CancellationToken; -use crate::{GlobalTimelines, SafeKeeperConf}; +use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser}; pub async fn task_main_http( conf: Arc, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 055ceeeef3..ba3cb77b63 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -4,7 +4,6 @@ use std::io::Write as _; use std::str::FromStr; use std::sync::Arc; -use desim::node_os; use http_utils::endpoint::{ self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler, prometheus_metrics_handler, request_span, @@ -38,6 +37,7 @@ use utils::shard::TenantShardId; use crate::debug_dump::TimelineDigestRequest; use crate::safekeeper::TermLsn; use crate::timelines_global_map::DeleteOrExclude; +use crate::wal_advertiser::advmap::UpdatePageserverAttachmentsArg; use crate::{ GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline, wal_advertiser, @@ -95,39 +95,37 @@ async fn tenant_delete_handler(mut request: Request) -> Result, ) -> Result, ApiError> { - let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let PutTenantPageserverLocationRequest { pageserver_locations, }: PutTenantPageserverLocationRequest = json_request(&mut request).await?; let global_timelines = get_global_timelines(&request); - global_timelines - .update_tenant_locations(tenant_id, pageserver_locations) - .await?; - - json_response(StatusCode::OK, response_body) -} - - -async fn tenant_get_pageserver_locations_handler( - mut request: Request, -) -> Result, ApiError> { - - let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; - check_permission(&request, Some(tenant_id))?; - - let global_timelines = get_global_timelines(&request); - let locations = global_timelines - .get_tenant_locations(tenant_id) - .await?; - - json_response(StatusCode::OK, locations) + let wal_advertiser = global_timelines.get_wal_advertiser(); + wal_advertiser + .update_pageserver_attachments( + tenant_shard_id, + pageserver_locations + .into_iter() + .map( + |TenantShardPageserverLocation { + generation, + pageserver_node_id, + }| UpdatePageserverAttachmentsArg { + generation, + pageserver_node_id, + }, + ) + .collect(), + ) + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, ()) } async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { @@ -753,11 +751,8 @@ pub fn make_router( .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) - .put("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| { - request_span(r, tenant_put_pageserver_locations_handler) - }) - .get("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| { - request_span(r, tenant_get_pageserver_locations_handler) + .put("/v1/tenant/:tenant_shard_id/pageserver_attachments", |r| { + request_span(r, tenant_put_pageserver_attachments) }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 62bd57dfc8..e820fd1d93 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -38,13 +38,12 @@ pub mod timeline_eviction; pub mod timeline_guard; pub mod timeline_manager; pub mod timelines_set; +pub mod wal_advertiser; pub mod wal_backup; pub mod wal_backup_partial; pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; -pub mod tenant; -pub mod wal_advertiser; #[cfg(any(test, feature = "benchmarking"))] pub mod test_utils; diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 577a2f694e..b2b4e935f0 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -360,6 +360,7 @@ async fn recovery_stream( listen_pg_addr_str: &donor.pg_connstr, auth_token: None, availability_zone: None, + pageserver_generation: None, }; let cfg = wal_stream_connection_config(connection_conf_args)?; let mut cfg = cfg.to_tokio_postgres_config(); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 26ceabd2e9..99ee2ea495 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -37,6 +37,7 @@ use crate::send_interpreted_wal::{ Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender, }; use crate::timeline::WalResidentTimeline; +use crate::wal_advertiser; use crate::wal_reader_stream::StreamingWalReader; use crate::wal_storage::WalReader; @@ -657,11 +658,29 @@ impl SafekeeperPostgresHandler { let tli_cancel = tli.cancel.clone(); + let wal_advertiser = match (self.shard, self.pageserver_generation) { + (Some(shard), Some(pageserver_generation)) => { + Some(tli.wal_advertiser.get_pageserver_timeline( + self.ttid, + shard.shard_index(), + pageserver_generation, + )) + } + (shard, pageserver_generation) => { + debug!( + ?shard, + ?pageserver_generation, + "cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation" + ); + None + } + }; + let mut reply_reader = ReplyReader { reader, ws_guard: ws_guard.clone(), tli, - shard: self.shard.clone(), + wal_advertiser, }; let res = tokio::select! { @@ -978,7 +997,7 @@ struct ReplyReader { reader: PostgresBackendReader, ws_guard: Arc, tli: WalResidentTimeline, - shard: Option, + wal_advertiser: Option>, } impl ReplyReader { @@ -1023,8 +1042,11 @@ impl ReplyReader { .walsenders .record_ps_feedback(self.ws_guard.id, &ps_feedback); self.tli - .update_remote_consistent_lsn(shard, ps_feedback.remote_consistent_lsn) + .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn) .await; + if let Some(wal_advertiser) = &self.wal_advertiser { + wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn); + } // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through // broker updated, not need to do it here diff --git a/safekeeper/src/tenant.rs b/safekeeper/src/tenant.rs deleted file mode 100644 index 3e68dc6702..0000000000 --- a/safekeeper/src/tenant.rs +++ /dev/null @@ -1,42 +0,0 @@ -use safekeeper_api::models::TenantShardPageserverLocation; -use utils::shard::TenantShardId; -use walproposer::bindings::Generation; - -pub(crate) struct Tenant { - inner: Arc>, -} - -struct Inner { - data: HashMap, -} - -struct PerLocationState { - remote_consistent_lsn: Lsn, - last_update_at: Instant, - dirty: bool, -} - -impl Tenant { - pub fn load(conf: Arc, tenant_id: TenantId) -> anyhow::Result> { - todo!() - } - - pub fn get_locations(&self) -> anyhow::Result> { - todo!() - } - pub fn update_locations( - &self, - locations: Vec, - ) -> anyhow::Result<()> { - todo!() - } - pub fn update_remote_consistent_lsn( - &self, - tenant_shard_id: TenantShardId, - generation: Generation, - timeline_id: TimelineId, - lsn: Lsn, - ) -> anyhow::Result<()> { - todo!() - } -} diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index 618e2b59d2..b82df01f3d 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -106,6 +106,7 @@ impl Env { &timeline_dir, &remote_path, shared_state, + todo!(), conf.clone(), ); timeline.bootstrap( diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index c3c07bef78..397bd82af5 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -33,14 +33,15 @@ use crate::receive_wal::WalReceivers; use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues}; use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState}; -use crate::tenant::Tenant; use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; use crate::wal_backup::{self, remote_timeline_path}; use crate::wal_backup_partial::PartialRemoteSegment; use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; -use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage}; +use crate::{ + SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage, +}; fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { PeerInfo { @@ -427,7 +428,6 @@ type RemoteDeletionReceiver = tokio::sync::watch::Receiver, pub remote_path: RemotePath, /// Used to broadcast commit_lsn updates to all background jobs. @@ -449,6 +449,7 @@ pub struct Timeline { /// synchronized with the disk. This is tokio mutex as we write WAL to disk /// while holding it, ensuring that consensus checks are in order. mutex: RwLock, + pub(crate) wal_advertiser: Arc, walsenders: Arc, walreceivers: Arc, timeline_dir: Utf8PathBuf, @@ -478,7 +479,7 @@ impl Timeline { timeline_dir: &Utf8Path, remote_path: &RemotePath, shared_state: SharedState, - tenant: Arc, + wal_advertiser: Arc, conf: Arc, ) -> Arc { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = @@ -493,7 +494,6 @@ impl Timeline { Arc::new(Self { ttid, - tenant, remote_path: remote_path.to_owned(), timeline_dir: timeline_dir.to_owned(), commit_lsn_watch_tx, @@ -503,6 +503,7 @@ impl Timeline { shared_state_version_tx, shared_state_version_rx, mutex: RwLock::new(shared_state), + wal_advertiser, walsenders: WalSenders::new(walreceivers.clone()), walreceivers, gate: Default::default(), @@ -520,8 +521,8 @@ impl Timeline { /// Load existing timeline from disk. pub fn load_timeline( conf: Arc, - tenant: Arc, ttid: TenantTimelineId, + wal_advertiser: Arc, ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); @@ -534,7 +535,7 @@ impl Timeline { &timeline_dir, &remote_path, shared_state, - tenant, + wal_advertiser, conf, )) } @@ -1069,19 +1070,12 @@ impl WalResidentTimeline { } /// Update in memory remote consistent lsn. - pub async fn update_remote_consistent_lsn(&self, shard: Option, candidate: Lsn) { + pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) { let mut shared_state = self.write_shared_state().await; shared_state.sk.state_mut().inmem.remote_consistent_lsn = max( shared_state.sk.state().inmem.remote_consistent_lsn, candidate, ); - drop(shared_state); - self.tli.tenant.update_remote_consistent_lsn( - todo!(), - todo!(), - todo!(), - - ) } } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index a626838f39..bf3e8b2b21 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -27,11 +27,10 @@ use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::http::routes::DeleteOrExcludeError; use crate::rate_limit::RateLimiter; use crate::state::TimelinePersistentState; -use crate::tenant::Tenant; use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir}; use crate::timelines_set::TimelinesSet; use crate::wal_storage::Storage; -use crate::{SafeKeeperConf, control_file, wal_storage}; +use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage}; // Timeline entry in the global map: either a ready timeline, or mark that it is // being created. @@ -42,7 +41,6 @@ enum GlobalMapTimeline { } struct GlobalTimelinesState { - tenants: HashMap>, timelines: HashMap, // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent @@ -52,16 +50,25 @@ struct GlobalTimelinesState { conf: Arc, broker_active_set: Arc, + wal_advertisement: Arc, global_rate_limiter: RateLimiter, } impl GlobalTimelinesState { /// Get dependencies for a timeline constructor. - fn get_dependencies(&self) -> (Arc, Arc, RateLimiter) { + fn get_dependencies( + &self, + ) -> ( + Arc, + Arc, + RateLimiter, + Arc, + ) { ( self.conf.clone(), self.broker_active_set.clone(), self.global_rate_limiter.clone(), + self.wal_advertisement.clone(), ) } @@ -93,11 +100,11 @@ impl GlobalTimelines { pub fn new(conf: Arc) -> Self { Self { state: Mutex::new(GlobalTimelinesState { - tenants: HashMap::new(), timelines: HashMap::new(), tombstones: HashMap::new(), conf, broker_active_set: Arc::new(TimelinesSet::default()), + wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()), global_rate_limiter: RateLimiter::new(1, 1), }), } @@ -154,24 +161,13 @@ impl GlobalTimelines { /// just lock and unlock it for each timeline -- this function is called /// during init when nothing else is running, so this is fine. async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> { - let (conf, broker_active_set, partial_backup_rate_limiter) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = { let state = self.state.lock().unwrap(); state.get_dependencies() }; let timelines_dir = get_tenant_dir(&conf, &tenant_id); - let tenant = match Tenant::load(conf.clone(), tenant_id) { - Ok(tenant) => { - let mut state = self.state.lock().unwrap(); - state.tenants.insert(tenant_id, Arc::clone(&tenant)); - tenant - } - Err(e) => { - todo!("should we fail the whole startup process?") - } - }; - for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir))? { @@ -181,7 +177,8 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline(conf.clone(), tenant, ttid) { + let wal_advertiser = wal_advertiser.load_timeline(ttid); + match Timeline::load_timeline(conf.clone(), ttid, wal_advertiser) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; self.state @@ -241,7 +238,7 @@ impl GlobalTimelines { start_lsn: Lsn, commit_lsn: Lsn, ) -> Result> { - let (conf, _, _) = { + let (conf, _, _, _) = { let state = self.state.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { // Timeline already exists, return it. @@ -286,7 +283,7 @@ impl GlobalTimelines { check_tombstone: bool, ) -> Result> { // Check for existence and mark that we're creating it. - let (conf, broker_active_set, partial_backup_rate_limiter) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = { let mut state = self.state.lock().unwrap(); match state.timelines.get(&ttid) { Some(GlobalMapTimeline::CreationInProgress) => { @@ -315,7 +312,10 @@ impl GlobalTimelines { }; // Do the actual move and reflect the result in the map. - match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await { + let wal_advertiser = wal_advertiser.load_timeline(ttid); + match GlobalTimelines::install_temp_timeline(ttid, tmp_path, wal_advertiser, conf.clone()) + .await + { Ok(timeline) => { let mut timeline_shared_state = timeline.write_shared_state().await; let mut state = self.state.lock().unwrap(); @@ -354,6 +354,7 @@ impl GlobalTimelines { async fn install_temp_timeline( ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, + wal_advertiser: Arc, conf: Arc, ) -> Result> { let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id); @@ -396,7 +397,7 @@ impl GlobalTimelines { // Do the move. durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; - Timeline::load_timeline(conf, ttid) + Timeline::load_timeline(conf, ttid, wal_advertiser) } /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, @@ -584,25 +585,8 @@ impl GlobalTimelines { Ok(deleted) } - pub async fn update_tenant_locations( - &self, - tenant_id: &TenantId, - locations: Vec, - ) -> anyhow::Result<()> { - let tenant = { - let mut state = self.state.lock().unwrap(); - state.tenants.get(tenant_id).context("tenant not found")? - }; - tenant.update_locations(locations).await? - } - - pub fn get_tenant_locations( - &self, - tenant_id: &TenantId, - ) -> anyhow::Result> { - let state = self.state.lock().unwrap(); - let tenant = state.tenants.get(tenant_id).context("tenant not found")?; - Ok(tenant.get_locations()) + pub fn get_wal_advertiser(&self) -> Arc { + self.state.lock().unwrap().wal_advertisement.clone() } pub fn housekeeping(&self, tombstone_ttl: &Duration) { diff --git a/safekeeper/src/wal_advertiser.rs b/safekeeper/src/wal_advertiser.rs index e0321dcc38..5908be3e5d 100644 --- a/safekeeper/src/wal_advertiser.rs +++ b/safekeeper/src/wal_advertiser.rs @@ -2,75 +2,19 @@ use std::{collections::HashSet, sync::Arc, time::Duration}; -use desim::world::Node; - use crate::{GlobalTimelines, SafeKeeperConf}; -mod advmap; - -pub struct GlobalState { - pub resolver: NodeIdToHostportMap, -} - -#[derive(Default)] -pub struct NodeIdToHostportMap { - inner: Arc>>, -} - -impl NodeIdToHostportMap { - pub fn learn(&self, ) { - let mut inner = self.inner.lock().unwrap(); - inner.insert(node_id, hostport); - } -} +pub(crate) mod advmap; pub(crate) async fn wal_advertiser_loop( conf: Arc, global_timelines: Arc, ) -> anyhow::Result<()> { - let active_timelines_set = global_timelines.get_global_broker_active_set(); - - let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); - - let outbound = async_stream::stream! { - loop { - // Note: we lock runtime here and in timeline methods as GlobalTimelines - // is under plain mutex. That's ok, all this code is not performance - // sensitive and there is no risk of deadlock as we don't await while - // lock is held. - let now = Instant::now(); - let all_tlis = active_timelines_set.get_all(); - let mut n_pushed_tlis = 0; - for tli in &all_tlis { - let sk_info = tli.get_safekeeper_info(&conf).await; - yield sk_info; - BROKER_PUSHED_UPDATES.inc(); - n_pushed_tlis += 1; - } - let elapsed = now.elapsed(); - - BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64()); - BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64); - - if elapsed > push_interval / 2 { - info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed); - } - - sleep(push_interval).await; - } - }; - client - .publish_safekeeper_info(Request::new(outbound)) - .await?; + todo!(); + node_loop().await; Ok(()) } pub(crate) async fn node_loop() { - - - loop { - - } - - + loop {} } diff --git a/safekeeper/src/wal_advertiser/advmap.rs b/safekeeper/src/wal_advertiser/advmap.rs index 611381cf54..07048c78d0 100644 --- a/safekeeper/src/wal_advertiser/advmap.rs +++ b/safekeeper/src/wal_advertiser/advmap.rs @@ -1,26 +1,78 @@ //! The data structure that track advertisement state. -use std::sync::{Arc, RwLock}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use serde::Serialize; +use utils::{ + generation::Generation, + id::{NodeId, TenantTimelineId, TimelineId}, + lsn::Lsn, + shard::{ShardIndex, TenantShardId}, +}; + +#[derive(Default)] pub struct World { pageservers: RwLock>>, } pub struct Pageserver { node_id: NodeId, - attachments: RwLock>>, + attachments: RwLock>>, } -pub struct Attachment { +pub struct PageserverAttachment { pageserver: NodeId, tenant_shard_id: TenantShardId, generation: Generation, - remote_consistent_lsn: RwLock>>, + remote_consistent_lsn: RwLock>>, } -pub struct Timeline { +pub struct PageserverTimeline { + pageserver: NodeId, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + generation: Generation, remote_consistent_lsn: RwLock, } +pub struct SafekeeperTimeline {} + +pub struct UpdatePageserverAttachmentsArg { + pub generation: Generation, + pub pageserver_node_id: NodeId, +} + +impl World { + pub fn housekeeping(&self) {} + pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc { + todo!() + } + pub fn update_pageserver_attachments( + &self, + tenant_shard_id: TenantShardId, + arg: Vec, + ) -> anyhow::Result<()> { + todo!() + } +} + +impl SafekeeperTimeline { + pub fn get_pageserver_timeline( + &self, + ttld: TenantTimelineId, + shard: ShardIndex, + pageserver_generation: Generation, + ) -> Arc { + assert!(!pageserver_generation.is_none()); + todo!() + } +} + +impl PageserverTimeline { + pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + todo!() + } +} diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 6e007265b2..855fe2c288 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream; use crate::handler::SafekeeperPostgresHandler; use crate::metrics::TrafficMetrics; -use crate::{GlobalTimelines, SafeKeeperConf}; +use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser}; /// Accept incoming TCP connections and spawn them into a background thread. /// @@ -51,7 +51,7 @@ pub async fn task_main( error!("connection handler exited: {}", err); } } - .instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)), + .instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)), ); } }