From dc6a3828731b787f71cf172e5276ed341c6489c8 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 6 Jun 2023 15:16:54 +0400 Subject: [PATCH 1/9] Increase timeouts on compute -> sk connections. context: https://github.com/neondatabase/neon/issues/4414 And improve messages/comments here and there. --- pgxn/neon/walproposer.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a99be40955..64d980d2e4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -254,20 +254,20 @@ nwp_register_gucs(void) DefineCustomIntVariable( "neon.safekeeper_reconnect_timeout", - "Timeout for reconnecting to offline wal acceptor.", + "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 1000, 0, INT_MAX, /* default, min, max */ + 5000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); DefineCustomIntVariable( "neon.safekeeper_connect_timeout", - "Timeout for connection establishement and it's maintenance against safekeeper", + "Connection or connection attempt to safekeeper is terminated if no message is received (or connection attempt doesn't finish) within this period.", NULL, &wal_acceptor_connection_timeout, - 5000, 0, INT_MAX, + 10000, 0, INT_MAX, PGC_SIGHUP, GUC_UNIT_MS, NULL, NULL, NULL); @@ -441,7 +441,7 @@ WalProposerPoll(void) if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wal_acceptor_connection_timeout)) { - elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms", + elog(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout); ShutdownConnection(sk); } @@ -1035,9 +1035,16 @@ RecvAcceptorGreeting(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->greetResponse)) return; + elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); + /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; + /* + * Note: it would be better to track the counter on per safekeeper basis, + * but at worst walproposer would restart with 'term rejected', so leave as + * is for now. + */ ++n_connected; if (n_connected <= quorum) { From c058e1cec2df70505263b19ad4e1f337d9643285 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 6 Jun 2023 15:27:17 +0400 Subject: [PATCH 2/9] Quick exit in truncate_wal if nothing to do. ref https://github.com/neondatabase/neon/issues/4414 --- safekeeper/src/wal_storage.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 1b82bd754e..644c956fc1 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -379,6 +379,12 @@ impl Storage for PhysicalStorage { ); } + // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on + // disk (this happens on each connect). + if end_pos == self.write_lsn { + return Ok(()); + } + // Close previously opened file, if any if let Some(mut unflushed_file) = self.file.take() { self.fdatasync_file(&mut unflushed_file)?; From 6b3c020cd90aab8e1e8c015ece39ebf8b6898da2 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 6 Jun 2023 15:30:26 +0400 Subject: [PATCH 3/9] Don't warn on system id = 0 in walproposer greeting. sync-safekeepers doesn't know it and sends 0. --- safekeeper/src/safekeeper.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 33da0c8e5a..eb434136d4 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -634,7 +634,8 @@ where } // system_id will be updated on mismatch - if self.state.server.system_id != msg.system_id { + // sync-safekeepers doesn't know sysid and sends 0, ignore it + if self.state.server.system_id != msg.system_id && msg.system_id != 0 { if self.state.server.system_id != 0 { warn!( "unexpected system ID arrived, got {}, expected {}", From 88f0cfc5755cd226c7cfd40440cb03130a28b432 Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Wed, 7 Jun 2023 11:41:53 +0200 Subject: [PATCH 4/9] Fix `pgx_ulid` extension (#4431) The issue was in the wrong `control` file name --- Dockerfile.compute-node | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node index f8429e72b8..44e13a6c73 100644 --- a/Dockerfile.compute-node +++ b/Dockerfile.compute-node @@ -531,7 +531,7 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz - mkdir pgx_ulid-src && cd pgx_ulid-src && tar xvzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \ sed -i 's/pgx = "=0.7.3"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \ cargo pgx install --release && \ - echo "trusted = true" >> /usr/local/pgsql/share/extension/pgx_ulid.control + echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control ######################################################################################### # From 5761190e0d61618ab805dd3b6dff3ef7fb768aff Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 7 Jun 2023 14:29:23 +0300 Subject: [PATCH 5/9] feat: three phased startup order (#4399) Initial logical size calculation could still hinder our fast startup efforts in #4397. See #4183. In deployment of 2023-06-06 about a 200 initial logical sizes were calculated on hosts which took the longest to complete initial load (12s). Implements the three step/tier initialization ordering described in #4397: 1. load local tenants 2. do initial logical sizes per walreceivers for 10s 3. background tasks Ordering is controlled by: - waiting on `utils::completion::Barrier`s on background tasks - having one attempt for each Timeline to do initial logical size calculation - `pageserver/src/bin/pageserver.rs` releasing background jobs after timeout or completion of initial logical size calculation The timeout is there just to safeguard in case a legitimate non-broken timeline initial logical size calculation goes long. The timeout is configurable, by default 10s, which I think would be fine for production systems. In the test cases I've been looking at, it seems that these steps are completed as fast as possible. Co-authored-by: Christian Schwarz --- pageserver/src/bin/pageserver.rs | 121 ++++++++++++++++-- pageserver/src/config.rs | 34 ++++- pageserver/src/disk_usage_eviction_task.rs | 21 ++- pageserver/src/lib.rs | 23 ++++ pageserver/src/tenant.rs | 58 +++++---- pageserver/src/tenant/mgr.rs | 13 +- pageserver/src/tenant/tasks.rs | 34 +++-- pageserver/src/tenant/timeline.rs | 46 ++++++- .../src/tenant/timeline/eviction_task.rs | 16 ++- .../regress/test_disk_usage_eviction.py | 6 + 10 files changed, 295 insertions(+), 77 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index e0731ba79b..1fa5e4ab3b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -337,33 +337,114 @@ fn start_pageserver( // Startup staging or optimizing: // - // (init_done_tx, init_done_rx) are used to control when do background loops start. This is to - // avoid starving out the BACKGROUND_RUNTIME async worker threads doing heavy work, like - // initial repartitioning while we still have Loading tenants. + // We want to minimize downtime for `page_service` connections, and trying not to overload + // BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time. // - // init_done_rx is a barrier which stops waiting once all init_done_tx clones are dropped. + // init_done_rx will notify when all initial load operations have completed. + // + // background_jobs_can_start (same name used to hold off background jobs from starting at + // consumer side) will be dropped once we can start the background jobs. Currently it is behind + // completing all initial logical size calculations (init_logical_size_done_rx) and a timeout + // (background_task_maximum_delay). let (init_done_tx, init_done_rx) = utils::completion::channel(); + let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel(); + + let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel(); + + let order = pageserver::InitializationOrder { + initial_tenant_load: Some(init_done_tx), + initial_logical_size_can_start: init_done_rx.clone(), + initial_logical_size_attempt: init_logical_size_done_tx, + background_jobs_can_start: background_jobs_barrier.clone(), + }; + // Scan the local 'tenants/' directory and start loading the tenants let init_started_at = std::time::Instant::now(); + let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); + BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), remote_storage.clone(), - (init_done_tx, init_done_rx.clone()), + order, ))?; BACKGROUND_RUNTIME.spawn({ - let init_done_rx = init_done_rx.clone(); - async move { - init_done_rx.wait().await; + let init_done_rx = init_done_rx; + let shutdown_pageserver = shutdown_pageserver.clone(); + let drive_init = async move { + // NOTE: unlike many futures in pageserver, this one is cancellation-safe + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed")); - let elapsed = init_started_at.elapsed(); + init_done_rx.wait().await; + // initial logical sizes can now start, as they were waiting on init_done_rx. + + scopeguard::ScopeGuard::into_inner(guard); + + let init_done = std::time::Instant::now(); + let elapsed = init_done - init_started_at; tracing::info!( elapsed_millis = elapsed.as_millis(), - "Initial load completed." + "Initial load completed" ); + + let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait()); + + let timeout = conf.background_task_maximum_delay; + + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); + + let init_sizes_done = tokio::select! { + _ = &mut init_sizes_done => { + let now = std::time::Instant::now(); + tracing::info!( + from_init_done_millis = (now - init_done).as_millis(), + from_init_millis = (now - init_started_at).as_millis(), + "Initial logical sizes completed" + ); + None + } + _ = tokio::time::sleep(timeout) => { + tracing::info!( + timeout_millis = timeout.as_millis(), + "Initial logical size timeout elapsed; starting background jobs" + ); + Some(init_sizes_done) + } + }; + + scopeguard::ScopeGuard::into_inner(guard); + + // allow background jobs to start + drop(background_jobs_can_start); + + if let Some(init_sizes_done) = init_sizes_done { + // ending up here is not a bug; at the latest logical sizes will be queried by + // consumption metrics. + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); + init_sizes_done.await; + + scopeguard::ScopeGuard::into_inner(guard); + + let now = std::time::Instant::now(); + tracing::info!( + from_init_done_millis = (now - init_done).as_millis(), + from_init_millis = (now - init_started_at).as_millis(), + "Initial logical sizes completed after timeout (background jobs already started)" + ); + + } + }; + + async move { + let mut drive_init = std::pin::pin!(drive_init); + // just race these tasks + tokio::select! { + _ = shutdown_pageserver.cancelled() => {}, + _ = &mut drive_init => {}, + } } }); @@ -378,7 +459,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), - init_done_rx.clone(), + background_jobs_barrier.clone(), )?; } @@ -416,7 +497,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { - let init_done_rx = init_done_rx; + let background_jobs_barrier = background_jobs_barrier; let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. @@ -432,12 +513,17 @@ fn start_pageserver( "consumption metrics collection", true, async move { - // first wait for initial load to complete before first iteration. + // first wait until background jobs are cleared to launch. // // this is because we only process active tenants and timelines, and the // Timeline::get_current_logical_size will spawn the logical size calculation, // which will not be rate-limited. - init_done_rx.wait().await; + let cancel = task_mgr::shutdown_token(); + + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = background_jobs_barrier.wait() => {} + }; pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, @@ -487,6 +573,8 @@ fn start_pageserver( ); } + let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); + // All started up! Now just sit and wait for shutdown signal. ShutdownSignals::handle(|signal| match signal { Signal::Quit => { @@ -502,6 +590,11 @@ fn start_pageserver( "Got {}. Terminating gracefully in fast shutdown mode", signal.name() ); + + // This cancels the `shutdown_pageserver` cancellation tree. + // Right now that tree doesn't reach very far, and `task_mgr` is used instead. + // The plan is to change that over time. + shutdown_pageserver.take(); BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0)); unreachable!() } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 02763c9b7d..17e6e3fb2a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -63,6 +63,7 @@ pub mod defaults { pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour"; pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option = None; pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; + pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s"; /// /// Default built-in configuration file. @@ -91,9 +92,10 @@ pub mod defaults { #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}' #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}' - #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}} +#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' + # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -187,6 +189,15 @@ pub struct PageServerConf { pub test_remote_failures: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, + + /// How long will background tasks be delayed at most after initial load of tenants. + /// + /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works + /// as we now isolate initial loading, initial logical size calculation and background tasks. + /// Smaller nodes will have background tasks "not running" for this long unless every timeline + /// has it's initial logical size calculated. Not running background tasks for some seconds is + /// not terrible. + pub background_task_maximum_delay: Duration, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -259,6 +270,8 @@ struct PageServerConfigBuilder { test_remote_failures: BuilderValue, ondemand_download_behavior_treat_error_as_warn: BuilderValue, + + background_task_maximum_delay: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -316,6 +329,11 @@ impl Default for PageServerConfigBuilder { test_remote_failures: Set(0), ondemand_download_behavior_treat_error_as_warn: Set(false), + + background_task_maximum_delay: Set(humantime::parse_duration( + DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY, + ) + .unwrap()), } } } @@ -440,6 +458,10 @@ impl PageServerConfigBuilder { BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn); } + pub fn background_task_maximum_delay(&mut self, delay: Duration) { + self.background_task_maximum_delay = BuilderValue::Set(delay); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -522,6 +544,9 @@ impl PageServerConfigBuilder { .ok_or(anyhow!( "missing ondemand_download_behavior_treat_error_as_warn" ))?, + background_task_maximum_delay: self + .background_task_maximum_delay + .ok_or(anyhow!("missing background_task_maximum_delay"))?, }) } } @@ -710,6 +735,7 @@ impl PageServerConf { ) }, "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?), + "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -877,6 +903,7 @@ impl PageServerConf { disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: Duration::ZERO, } } } @@ -1036,6 +1063,7 @@ metric_collection_endpoint = 'http://localhost:80/metrics' synthetic_size_calculation_interval = '333 s' log_format = 'json' +background_task_maximum_delay = '334 s' "#; @@ -1094,6 +1122,9 @@ log_format = 'json' disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: humantime::parse_duration( + defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY + )?, }, "Correct defaults should be used when no config values are provided" ); @@ -1148,6 +1179,7 @@ log_format = 'json' disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: Duration::from_secs(334), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 1a8886935c..ce5f81c44b 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -83,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, - init_done: completion::Barrier, + background_jobs_barrier: completion::Barrier, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -100,17 +100,16 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { - // wait until initial load is complete, because we cannot evict from loading tenants. - init_done.wait().await; + let cancel = task_mgr::shutdown_token(); - disk_usage_eviction_task( - &state, - task_config, - storage, - &conf.tenants_path(), - task_mgr::shutdown_token(), - ) - .await; + // wait until initial load is complete, because we cannot evict from loading tenants. + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = background_jobs_barrier.wait() => { } + }; + + disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel) + .await; info!("disk usage based eviction task finishing"); Ok(()) }, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 40a672bee3..5831091098 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -132,6 +132,29 @@ pub fn is_uninit_mark(path: &Path) -> bool { } } +/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by +/// blocking. +/// +/// The instances of this value exist only during startup, otherwise `None` is provided, meaning no +/// delaying is needed. +#[derive(Clone)] +pub struct InitializationOrder { + /// Each initial tenant load task carries this until completion. + pub initial_tenant_load: Option, + + /// Barrier for when we can start initial logical size calculations. + pub initial_logical_size_can_start: utils::completion::Barrier, + + /// Each timeline owns a clone of this to be consumed on the initial logical size calculation + /// attempt. It is important to drop this once the attempt has completed. + pub initial_logical_size_attempt: utils::completion::Completion, + + /// Barrier for when we can start any background jobs. + /// + /// This can be broken up later on, but right now there is just one class of a background job. + pub background_jobs_can_start: utils::completion::Barrier, +} + #[cfg(test)] mod backoff_defaults_tests { use super::*; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7ce0ed81bc..29086cae86 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -65,6 +65,7 @@ use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::tenant::storage_layer::Layer; +use crate::InitializationOrder; use crate::virtual_file::VirtualFile; use crate::walredo::PostgresRedoManager; @@ -510,6 +511,7 @@ impl Tenant { local_metadata: Option, ancestor: Option>, first_save: bool, + init_order: Option<&InitializationOrder>, ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_id; @@ -535,6 +537,7 @@ impl Tenant { up_to_date_metadata, ancestor.clone(), remote_client, + init_order, )?; let timeline = UninitializedTimeline { @@ -560,6 +563,7 @@ impl Tenant { up_to_date_metadata, ancestor.clone(), None, + None, ) .with_context(|| { format!("creating broken timeline data for {tenant_id}/{timeline_id}") @@ -858,6 +862,7 @@ impl Tenant { local_metadata, ancestor, true, + None, ctx, ) .await @@ -892,16 +897,13 @@ impl Tenant { /// /// If the loading fails for some reason, the Tenant will go into Broken /// state. - /// - /// `init_done` is an optional channel used during initial load to delay background task - /// start. It is not used later. #[instrument(skip_all, fields(tenant_id=%tenant_id))] pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: Option<(completion::Completion, completion::Barrier)>, + init_order: Option, ctx: &RequestContext, ) -> Arc { debug_assert_current_span_has_tenant_id(); @@ -937,17 +939,17 @@ impl Tenant { "initial tenant load", false, async move { - // keep the sender alive as long as we have the initial load ongoing; it will be - // None for loads spawned after init_tenant_mgr. - let (_tx, rx) = if let Some((tx, rx)) = init_done { - (Some(tx), Some(rx)) - } else { - (None, None) - }; - match tenant_clone.load(&ctx).await { + let mut init_order = init_order; + + // take the completion because initial tenant loading will complete when all of + // these tasks complete. + let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take()); + + match tenant_clone.load(init_order.as_ref(), &ctx).await { Ok(()) => { debug!("load finished, activating"); - tenant_clone.activate(broker_client, rx.as_ref(), &ctx); + let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); + tenant_clone.activate(broker_client, background_jobs_can_start, &ctx); } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -974,7 +976,11 @@ impl Tenant { /// files on disk. Used at pageserver startup. /// /// No background tasks are started as part of this routine. - async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + async fn load( + self: &Arc, + init_order: Option<&InitializationOrder>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); debug!("loading tenant task"); @@ -1094,7 +1100,7 @@ impl Tenant { // 1. "Timeline has no ancestor and no layer files" for (timeline_id, local_metadata) in sorted_timelines { - self.load_local_timeline(timeline_id, local_metadata, ctx) + self.load_local_timeline(timeline_id, local_metadata, init_order, ctx) .await .with_context(|| format!("load local timeline {timeline_id}"))?; } @@ -1112,6 +1118,7 @@ impl Tenant { &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, + init_order: Option<&InitializationOrder>, ctx: &RequestContext, ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1181,6 +1188,7 @@ impl Tenant { Some(local_metadata), ancestor, false, + init_order, ctx, ) .await @@ -1724,12 +1732,12 @@ impl Tenant { /// Changes tenant status to active, unless shutdown was already requested. /// - /// `init_done` is an optional channel used during initial load to delay background task - /// start. It is not used later. + /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup + /// to delay background jobs. Background jobs can be started right away when None is given. fn activate( self: &Arc, broker_client: BrokerClientChannel, - init_done: Option<&completion::Barrier>, + background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { debug_assert_current_span_has_tenant_id(); @@ -1762,12 +1770,12 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self, init_done); + tasks::start_background_loops(self, background_jobs_can_start); let mut activated_timelines = 0; for timeline in not_broken_timelines { - timeline.activate(broker_client.clone(), init_done, ctx); + timeline.activate(broker_client.clone(), background_jobs_can_start, ctx); activated_timelines += 1; } @@ -2158,6 +2166,7 @@ impl Tenant { new_metadata: &TimelineMetadata, ancestor: Option>, remote_client: Option, + init_order: Option<&InitializationOrder>, ) -> anyhow::Result> { if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() { anyhow::ensure!( @@ -2166,6 +2175,9 @@ impl Tenant { ) } + let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start); + let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt); + let pg_version = new_metadata.pg_version(); Ok(Timeline::new( self.conf, @@ -2177,6 +2189,8 @@ impl Tenant { Arc::clone(&self.walredo_mgr), remote_client, pg_version, + initial_logical_size_can_start.cloned(), + initial_logical_size_attempt.cloned(), )) } @@ -2852,7 +2866,7 @@ impl Tenant { remote_client: Option, ) -> anyhow::Result> { let timeline_data = self - .create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client) + .create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None) .context("Failed to create timeline data structure")?; crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?; @@ -3420,7 +3434,7 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant - .load(ctx) + .load(None, ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; tenant.state.send_replace(TenantState::Active); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 740f9621b6..a1638e4a95 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -21,9 +21,8 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; -use crate::IGNORED_TENANT_FILE_NAME; +use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME}; -use utils::completion; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; @@ -65,7 +64,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: (completion::Completion, completion::Barrier), + init_order: InitializationOrder, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -122,7 +121,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), - Some(init_done.clone()), + Some(init_order.clone()), &ctx, ) { Ok(tenant) => { @@ -153,14 +152,12 @@ pub async fn init_tenant_mgr( Ok(()) } -/// `init_done` is an optional channel used during initial load to delay background task -/// start. It is not used later. pub fn schedule_local_tenant_processing( conf: &'static PageServerConf, tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: Option<(completion::Completion, completion::Barrier)>, + init_order: Option, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -219,7 +216,7 @@ pub fn schedule_local_tenant_processing( tenant_id, broker_client, remote_storage, - init_done, + init_order, ctx, ) }; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 1bbc1b1c08..360818b5a7 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -15,10 +15,10 @@ use tracing::*; use utils::completion; /// Start per tenant background loops: compaction and gc. -/// -/// `init_done` is an optional channel used during initial load to delay background task -/// start. It is not used later. -pub fn start_background_loops(tenant: &Arc, init_done: Option<&completion::Barrier>) { +pub fn start_background_loops( + tenant: &Arc, + background_jobs_can_start: Option<&completion::Barrier>, +) { let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -29,10 +29,14 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio false, { let tenant = Arc::clone(tenant); - let init_done = init_done.cloned(); + let background_jobs_can_start = background_jobs_can_start.cloned(); async move { - completion::Barrier::maybe_wait(init_done).await; - compaction_loop(tenant) + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + compaction_loop(tenant, cancel) .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -48,10 +52,14 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio false, { let tenant = Arc::clone(tenant); - let init_done = init_done.cloned(); + let background_jobs_can_start = background_jobs_can_start.cloned(); async move { - completion::Barrier::maybe_wait(init_done).await; - gc_loop(tenant) + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + gc_loop(tenant, cancel) .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -63,12 +71,11 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio /// /// Compaction task's main loop /// -async fn compaction_loop(tenant: Arc) { +async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); let mut first = true; loop { @@ -133,12 +140,11 @@ async fn compaction_loop(tenant: Arc) { /// /// GC task's main loop /// -async fn gc_loop(tenant: Arc) { +async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. let ctx = diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fdaad58e16..507f0de4f3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -242,6 +242,13 @@ pub struct Timeline { pub delete_lock: tokio::sync::Mutex, eviction_task_timeline_state: tokio::sync::Mutex, + + /// Barrier to wait before doing initial logical size calculation. Used only during startup. + initial_logical_size_can_start: Option, + + /// Completion shared between all timelines loaded during startup; used to delay heavier + /// background tasks until some logical sizes have been calculated. + initial_logical_size_attempt: Mutex>, } /// Internal structure to hold all data needed for logical size calculation. @@ -932,12 +939,12 @@ impl Timeline { pub fn activate( self: &Arc, broker_client: BrokerClientChannel, - init_done: Option<&completion::Barrier>, + background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); - self.launch_eviction_task(init_done); + self.launch_eviction_task(background_jobs_can_start); } pub fn set_state(&self, new_state: TimelineState) { @@ -955,6 +962,14 @@ impl Timeline { error!("Not activating a Stopping timeline"); } (_, new_state) => { + if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) { + // drop the copmletion guard, if any; it might be holding off the completion + // forever needlessly + self.initial_logical_size_attempt + .lock() + .unwrap_or_else(|e| e.into_inner()) + .take(); + } self.state.send_replace(new_state); } } @@ -1345,6 +1360,8 @@ impl Timeline { walredo_mgr: Arc, remote_client: Option, pg_version: u32, + initial_logical_size_can_start: Option, + initial_logical_size_attempt: Option, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(TimelineState::Loading); @@ -1439,6 +1456,9 @@ impl Timeline { EvictionTaskTimelineState::default(), ), delete_lock: tokio::sync::Mutex::new(false), + + initial_logical_size_can_start, + initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -1927,7 +1947,27 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { - // no cancellation here, because nothing really waits for this to complete compared + + let cancel = task_mgr::shutdown_token(); + + // in case we were created during pageserver initialization, wait for + // initialization to complete before proceeding. startup time init runs on the same + // runtime. + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {} + }; + + // hold off background tasks from starting until all timelines get to try at least + // once initial logical size calculation; though retry will rarely be useful. + // holding off is done because heavier tasks execute blockingly on the same + // runtime. + // + // dropping this at every outcome is probably better than trying to cling on to it, + // delay will be terminated by a timeout regardless. + let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; + + // no extra cancellation here, because nothing really waits for this to complete compared // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 7029d75d63..1040dff63d 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -49,9 +49,12 @@ pub struct EvictionTaskTenantState { } impl Timeline { - pub(super) fn launch_eviction_task(self: &Arc, init_done: Option<&completion::Barrier>) { + pub(super) fn launch_eviction_task( + self: &Arc, + background_tasks_can_start: Option<&completion::Barrier>, + ) { let self_clone = Arc::clone(self); - let init_done = init_done.cloned(); + let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, @@ -60,8 +63,13 @@ impl Timeline { &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id), false, async move { - completion::Barrier::maybe_wait(init_done).await; - self_clone.eviction_task(task_mgr::shutdown_token()).await; + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); } + _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {} + }; + + self_clone.eviction_task(cancel).await; info!("eviction task finishing"); Ok(()) }, diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index ab67518092..0ec023b9e1 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -110,6 +110,12 @@ class EvictionEnv: overrides=( "--pageserver-config-override=disk_usage_based_eviction=" + enc.dump_inline_table(disk_usage_config).replace("\n", " "), + # Disk usage based eviction runs as a background task. + # But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup. + # But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages. + # But, we only have a 10-second-timeout in this test. + # So, disable the delay for this test. + "--pageserver-config-override=background_task_maximum_delay='0s'", ), ) From 37bf2cac4f6416ff22f30e5bdacf190363915e58 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 7 Jun 2023 12:53:27 +0400 Subject: [PATCH 6/9] Persist safekeeper control file once in a while. It should make remote_consistent_lsn commonly up-to-date on non actively writing projects, which removes spike or pageserver -> safekeeper reconnections on storage nodes restart. --- safekeeper/src/control_file.rs | 12 ++++++++++++ safekeeper/src/remove_wal.rs | 3 +++ safekeeper/src/safekeeper.rs | 32 ++++++++++++++++++++++++++++---- safekeeper/src/timeline.rs | 12 +++++++++++- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index ba5e453e41..b1b0c032d7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -7,6 +7,7 @@ use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::time::Instant; use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; @@ -28,6 +29,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + + /// Timestamp of last persist. + fn last_persist_at(&self) -> Instant; } #[derive(Debug)] @@ -38,6 +42,8 @@ pub struct FileStorage { /// Last state persisted to disk. state: SafeKeeperState, + /// Not preserved across restarts. + last_persist_at: Instant, } impl FileStorage { @@ -51,6 +57,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }) } @@ -66,6 +73,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }; Ok(store) @@ -216,6 +224,10 @@ impl Storage for FileStorage { self.state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + self.last_persist_at + } } #[cfg(test)] diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index b6d497f34e..ad9d655fae 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -17,6 +17,9 @@ pub fn thread_main(conf: SafeKeeperConf) { let ttid = tli.ttid; let _enter = info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); + if let Err(e) = tli.maybe_pesist_control_file() { + warn!("failed to persist control file: {e}"); + } if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { warn!("failed to remove WAL: {}", e); } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index eb434136d4..7378ccb994 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -10,6 +10,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; +use std::time::Duration; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -837,6 +838,26 @@ where self.state.persist(&state) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. + pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); + if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + return Ok(()); + } + let need_persist = self.inmem.commit_lsn > self.state.commit_lsn + || self.inmem.backup_lsn > self.state.backup_lsn + || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn + || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn; + if need_persist { + let mut state = self.state.clone(); + state.remote_consistent_lsn = inmem_remote_consistent_lsn; + self.persist_control_file(state)?; + trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); + } + Ok(()) + } + /// Handle request to append WAL. #[allow(clippy::comparison_chain)] fn handle_append_request( @@ -949,9 +970,8 @@ where if sync_control_file { let mut state = self.state.clone(); - // Note: we do not persist remote_consistent_lsn in other paths of - // persisting cf -- that is not much needed currently. We could do - // that by storing Arc to walsenders in Safekeeper. + // Note: we could make remote_consistent_lsn update in cf common by + // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; self.persist_control_file(state)?; } @@ -981,7 +1001,7 @@ mod tests { use super::*; use crate::wal_storage::Storage; - use std::ops::Deref; + use std::{ops::Deref, time::Instant}; // fake storage for tests struct InMemoryState { @@ -993,6 +1013,10 @@ mod tests { self.persisted_state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + Instant::now() + } } impl Deref for InMemoryState { diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2dbf215998..941f8dae54 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -234,7 +234,6 @@ impl SharedState { flush_lsn: self.sk.wal_store.flush_lsn().0, // note: this value is not flushed to control file yet and can be lost commit_lsn: self.sk.inmem.commit_lsn.0, - // TODO: rework feedbacks to avoid max here remote_consistent_lsn: remote_consistent_lsn.0, peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0, safekeeper_connstr: conf.listen_pg_addr.clone(), @@ -673,6 +672,17 @@ impl Timeline { Ok(()) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. This helps to keep remote_consistent_lsn up + /// to date so that storage nodes restart doesn't cause many pageserver -> + /// safekeeper reconnections. + pub fn maybe_pesist_control_file(&self) -> Result<()> { + let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); + self.write_shared_state() + .sk + .maybe_persist_control_file(remote_consistent_lsn) + } + /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { From 1c200bd15f34d240d5b2bf55c31414229f10e70e Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 7 Jun 2023 10:51:13 -0400 Subject: [PATCH 7/9] fix: break dev dependencies between wal_craft and pg_ffi (#4424) ## Problem close https://github.com/neondatabase/neon/issues/4266 ## Summary of changes With this PR, rust-analyzer should be able to give lints and auto complete in `mod tests`, and this makes writing tests easier. Previously, rust-analyzer cannot do auto completion. --------- Signed-off-by: Alex Chi --- Cargo.lock | 3 +- Cargo.toml | 15 +- libs/postgres_ffi/Cargo.toml | 1 - libs/postgres_ffi/src/lib.rs | 12 +- .../postgres_ffi/src/wal_craft_test_export.rs | 6 + libs/postgres_ffi/src/xlog_utils.rs | 218 +---------------- libs/postgres_ffi/wal_craft/Cargo.toml | 4 + libs/postgres_ffi/wal_craft/src/lib.rs | 14 ++ .../wal_craft/src/xlog_utils_test.rs | 219 ++++++++++++++++++ 9 files changed, 270 insertions(+), 222 deletions(-) create mode 100644 libs/postgres_ffi/src/wal_craft_test_export.rs create mode 100644 libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..c078510129 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2874,7 +2874,6 @@ dependencies = [ "serde", "thiserror", "utils", - "wal_craft", "workspace_hack", ] @@ -4894,7 +4893,9 @@ dependencies = [ "once_cell", "postgres", "postgres_ffi", + "regex", "tempfile", + "utils", "workspace_hack", ] diff --git a/Cargo.toml b/Cargo.toml index 1cb8d65948..d7bffe67e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,20 @@ members = [ "storage_broker", "workspace_hack", "trace", - "libs/*", + "libs/compute_api", + "libs/pageserver_api", + "libs/postgres_ffi", + "libs/safekeeper_api", + "libs/utils", + "libs/consumption_metrics", + "libs/postgres_backend", + "libs/pq_proto", + "libs/tenant_size_model", + "libs/metrics", + "libs/postgres_connection", + "libs/remote_storage", + "libs/tracing-utils", + "libs/postgres_ffi/wal_craft", ] [workspace.package] diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 159fc5946d..86e72f6bdd 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -24,7 +24,6 @@ workspace_hack.workspace = true [dev-dependencies] env_logger.workspace = true postgres.workspace = true -wal_craft = { path = "wal_craft" } [build-dependencies] anyhow.workspace = true diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index b8eb469cb0..cc115664d5 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -33,6 +33,7 @@ macro_rules! postgres_ffi { } pub mod controlfile_utils; pub mod nonrelfile_utils; + pub mod wal_craft_test_export; pub mod waldecoder_handler; pub mod xlog_utils; @@ -45,8 +46,15 @@ macro_rules! postgres_ffi { }; } -postgres_ffi!(v14); -postgres_ffi!(v15); +#[macro_export] +macro_rules! for_all_postgres_versions { + ($macro:tt) => { + $macro!(v14); + $macro!(v15); + }; +} + +for_all_postgres_versions! { postgres_ffi } pub mod pg_constants; pub mod relfile_utils; diff --git a/libs/postgres_ffi/src/wal_craft_test_export.rs b/libs/postgres_ffi/src/wal_craft_test_export.rs new file mode 100644 index 0000000000..147567c442 --- /dev/null +++ b/libs/postgres_ffi/src/wal_craft_test_export.rs @@ -0,0 +1,6 @@ +//! This module is for WAL craft to test with postgres_ffi. Should not import any thing in normal usage. + +pub use super::PG_MAJORVERSION; +pub use super::xlog_utils::*; +pub use super::bindings::*; +pub use crate::WAL_SEGMENT_SIZE; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 4d7bb61883..61a9c38a84 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -481,220 +481,4 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { wal } -#[cfg(test)] -mod tests { - use super::super::PG_MAJORVERSION; - use super::*; - use regex::Regex; - use std::cmp::min; - use std::fs; - use std::{env, str::FromStr}; - use utils::const_assert; - - fn init_logging() { - let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( - format!("wal_craft=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), - )) - .is_test(true) - .try_init(); - } - - fn test_end_of_wal(test_name: &str) { - use wal_craft::*; - - let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); - - // Craft some WAL - let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("..") - .join(".."); - let cfg = Conf { - pg_version, - pg_distrib_dir: top_path.join("pg_install"), - datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), - }; - if cfg.datadir.exists() { - fs::remove_dir_all(&cfg.datadir).unwrap(); - } - cfg.initdb().unwrap(); - let srv = cfg.start_server().unwrap(); - let (intermediate_lsns, expected_end_of_wal_partial) = - C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); - let intermediate_lsns: Vec = intermediate_lsns - .iter() - .map(|&lsn| u64::from(lsn).into()) - .collect(); - let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); - srv.kill(); - - // Check find_end_of_wal on the initial WAL - let last_segment = cfg - .wal_dir() - .read_dir() - .unwrap() - .map(|f| f.unwrap().file_name().into_string().unwrap()) - .filter(|fname| IsXLogFileName(fname)) - .max() - .unwrap(); - check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); - for start_lsn in intermediate_lsns - .iter() - .chain(std::iter::once(&expected_end_of_wal)) - { - // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. - // We assume that `start_lsn` is non-decreasing. - info!( - "Checking with start_lsn={}, erasing WAL before it", - start_lsn - ); - for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { - let fname = file.file_name().into_string().unwrap(); - if !IsXLogFileName(&fname) { - continue; - } - let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); - let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); - if seg_start_lsn > u64::from(*start_lsn) { - continue; - } - let mut f = File::options().write(true).open(file.path()).unwrap(); - const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; - f.write_all( - &ZEROS[0..min( - WAL_SEGMENT_SIZE, - (u64::from(*start_lsn) - seg_start_lsn) as usize, - )], - ) - .unwrap(); - } - check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); - } - } - - fn check_pg_waldump_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - expected_end_of_wal: Lsn, - ) { - // Get the actual end of WAL by pg_waldump - let waldump_output = cfg - .pg_waldump("000000010000000000000001", last_segment) - .unwrap() - .stderr; - let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); - let caps = match Regex::new(r"invalid record length at (.+):") - .unwrap() - .captures(waldump_output) - { - Some(caps) => caps, - None => { - error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); - panic!(); - } - }; - let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); - info!( - "waldump erred on {}, expected wal end at {}", - waldump_wal_end, expected_end_of_wal - ); - assert_eq!(waldump_wal_end, expected_end_of_wal); - } - - fn check_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - start_lsn: Lsn, - expected_end_of_wal: Lsn, - ) { - // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) - // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - // info!( - // "find_end_of_wal returned wal_end={} with non-partial WAL segment", - // wal_end - // ); - // assert_eq!(wal_end, expected_end_of_wal_non_partial); - - // Rename file to partial to actually find last valid lsn, then rename it back. - fs::rename( - cfg.wal_dir().join(last_segment), - cfg.wal_dir().join(format!("{}.partial", last_segment)), - ) - .unwrap(); - let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - info!( - "find_end_of_wal returned wal_end={} with partial WAL segment", - wal_end - ); - assert_eq!(wal_end, expected_end_of_wal); - fs::rename( - cfg.wal_dir().join(format!("{}.partial", last_segment)), - cfg.wal_dir().join(last_segment), - ) - .unwrap(); - } - - const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); - - #[test] - pub fn test_find_end_of_wal_simple() { - init_logging(); - test_end_of_wal::("test_find_end_of_wal_simple"); - } - - #[test] - pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_crossing_segment_followed_by_small_one", - ); - } - - #[test] - pub fn test_find_end_of_wal_last_crossing_segment() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_last_crossing_segment", - ); - } - - /// Check the math in update_next_xid - /// - /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, - /// currently 1024. - #[test] - pub fn test_update_next_xid() { - let checkpoint_buf = [0u8; std::mem::size_of::()]; - let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); - - checkpoint.nextXid = FullTransactionId { value: 10 }; - assert_eq!(checkpoint.nextXid.value, 10); - - // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL - // boundary - checkpoint.update_next_xid(100); - assert_eq!(checkpoint.nextXid.value, 1024); - - // No change - checkpoint.update_next_xid(500); - assert_eq!(checkpoint.nextXid.value, 1024); - checkpoint.update_next_xid(1023); - assert_eq!(checkpoint.nextXid.value, 1024); - - // The function returns the *next* XID, given the highest XID seen so - // far. So when we pass 1024, the nextXid gets bumped up to the next - // XID_CHECKPOINT_INTERVAL boundary. - checkpoint.update_next_xid(1024); - assert_eq!(checkpoint.nextXid.value, 2048); - } - - #[test] - pub fn test_encode_logical_message() { - let expected = [ - 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, - 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, - 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, - ]; - let actual = encode_logical_message("prefix", "message"); - assert_eq!(expected, actual[..]); - } -} +// If you need to craft WAL and write tests for this module, put it at wal_craft crate. diff --git a/libs/postgres_ffi/wal_craft/Cargo.toml b/libs/postgres_ffi/wal_craft/Cargo.toml index 992bf7460b..bea888b23e 100644 --- a/libs/postgres_ffi/wal_craft/Cargo.toml +++ b/libs/postgres_ffi/wal_craft/Cargo.toml @@ -15,3 +15,7 @@ postgres_ffi.workspace = true tempfile.workspace = true workspace_hack.workspace = true + +[dev-dependencies] +regex.workspace = true +utils.workspace = true diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 9f3f4dc20d..d4aed88048 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -10,6 +10,20 @@ use std::process::Command; use std::time::{Duration, Instant}; use tempfile::{tempdir, TempDir}; +macro_rules! xlog_utils_test { + ($version:ident) => { + #[path = "."] + mod $version { + pub use postgres_ffi::$version::wal_craft_test_export::*; + #[allow(clippy::duplicate_mod)] + #[cfg(test)] + mod xlog_utils_test; + } + }; +} + +postgres_ffi::for_all_postgres_versions! { xlog_utils_test } + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Conf { pub pg_version: u32, diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs new file mode 100644 index 0000000000..6ff4c563b2 --- /dev/null +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -0,0 +1,219 @@ +//! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency. + +use super::*; +use crate::{error, info}; +use regex::Regex; +use std::cmp::min; +use std::fs::{self, File}; +use std::io::Write; +use std::{env, str::FromStr}; +use utils::const_assert; +use utils::lsn::Lsn; + +fn init_logging() { + let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( + format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), + )) + .is_test(true) + .try_init(); +} + +fn test_end_of_wal(test_name: &str) { + use crate::*; + + let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); + + // Craft some WAL + let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("..") + .join("..") + .join(".."); + let cfg = Conf { + pg_version, + pg_distrib_dir: top_path.join("pg_install"), + datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), + }; + if cfg.datadir.exists() { + fs::remove_dir_all(&cfg.datadir).unwrap(); + } + cfg.initdb().unwrap(); + let srv = cfg.start_server().unwrap(); + let (intermediate_lsns, expected_end_of_wal_partial) = + C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); + let intermediate_lsns: Vec = intermediate_lsns + .iter() + .map(|&lsn| u64::from(lsn).into()) + .collect(); + let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); + srv.kill(); + + // Check find_end_of_wal on the initial WAL + let last_segment = cfg + .wal_dir() + .read_dir() + .unwrap() + .map(|f| f.unwrap().file_name().into_string().unwrap()) + .filter(|fname| IsXLogFileName(fname)) + .max() + .unwrap(); + check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); + for start_lsn in intermediate_lsns + .iter() + .chain(std::iter::once(&expected_end_of_wal)) + { + // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. + // We assume that `start_lsn` is non-decreasing. + info!( + "Checking with start_lsn={}, erasing WAL before it", + start_lsn + ); + for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { + let fname = file.file_name().into_string().unwrap(); + if !IsXLogFileName(&fname) { + continue; + } + let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); + let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); + if seg_start_lsn > u64::from(*start_lsn) { + continue; + } + let mut f = File::options().write(true).open(file.path()).unwrap(); + const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; + f.write_all( + &ZEROS[0..min( + WAL_SEGMENT_SIZE, + (u64::from(*start_lsn) - seg_start_lsn) as usize, + )], + ) + .unwrap(); + } + check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); + } +} + +fn check_pg_waldump_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + expected_end_of_wal: Lsn, +) { + // Get the actual end of WAL by pg_waldump + let waldump_output = cfg + .pg_waldump("000000010000000000000001", last_segment) + .unwrap() + .stderr; + let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); + let caps = match Regex::new(r"invalid record length at (.+):") + .unwrap() + .captures(waldump_output) + { + Some(caps) => caps, + None => { + error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); + panic!(); + } + }; + let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); + info!( + "waldump erred on {}, expected wal end at {}", + waldump_wal_end, expected_end_of_wal + ); + assert_eq!(waldump_wal_end, expected_end_of_wal); +} + +fn check_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + start_lsn: Lsn, + expected_end_of_wal: Lsn, +) { + // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) + // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + // info!( + // "find_end_of_wal returned wal_end={} with non-partial WAL segment", + // wal_end + // ); + // assert_eq!(wal_end, expected_end_of_wal_non_partial); + + // Rename file to partial to actually find last valid lsn, then rename it back. + fs::rename( + cfg.wal_dir().join(last_segment), + cfg.wal_dir().join(format!("{}.partial", last_segment)), + ) + .unwrap(); + let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + info!( + "find_end_of_wal returned wal_end={} with partial WAL segment", + wal_end + ); + assert_eq!(wal_end, expected_end_of_wal); + fs::rename( + cfg.wal_dir().join(format!("{}.partial", last_segment)), + cfg.wal_dir().join(last_segment), + ) + .unwrap(); +} + +const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); + +#[test] +pub fn test_find_end_of_wal_simple() { + init_logging(); + test_end_of_wal::("test_find_end_of_wal_simple"); +} + +#[test] +pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_crossing_segment_followed_by_small_one", + ); +} + +#[test] +pub fn test_find_end_of_wal_last_crossing_segment() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_last_crossing_segment", + ); +} + +/// Check the math in update_next_xid +/// +/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, +/// currently 1024. +#[test] +pub fn test_update_next_xid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + checkpoint.nextXid = FullTransactionId { value: 10 }; + assert_eq!(checkpoint.nextXid.value, 10); + + // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL + // boundary + checkpoint.update_next_xid(100); + assert_eq!(checkpoint.nextXid.value, 1024); + + // No change + checkpoint.update_next_xid(500); + assert_eq!(checkpoint.nextXid.value, 1024); + checkpoint.update_next_xid(1023); + assert_eq!(checkpoint.nextXid.value, 1024); + + // The function returns the *next* XID, given the highest XID seen so + // far. So when we pass 1024, the nextXid gets bumped up to the next + // XID_CHECKPOINT_INTERVAL boundary. + checkpoint.update_next_xid(1024); + assert_eq!(checkpoint.nextXid.value, 2048); +} + +#[test] +pub fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, + 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, + 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix", "message"); + assert_eq!(expected, actual[..]); +} From 1a1019990a59aa69675e4329d1e5e1e08bbddb71 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 7 Jun 2023 18:25:30 +0300 Subject: [PATCH 8/9] map TenantState::Broken to TenantAttachmentStatus::Failed (#4371) ## Problem Attach failures are not reported in public part of the api (in `attachment_status` field of TenantInfo). ## Summary of changes Expose TenantState::Broken as TenantAttachmentStatus::Failed In the way its written Failed status will be reported even if no attachment happened. (I e if tenant become broken on startup). This is in line with other members. I e Active will be resolved to Attached even if no actual attach took place. This can be tweaked if needed. At the current stage it would be overengineering without clear motivation resolves #4344 --- libs/pageserver_api/src/models.rs | 22 ++++++++++++-------- pageserver/src/http/openapi_spec.yml | 24 ++++++++++++++++++---- test_runner/fixtures/pageserver/utils.py | 6 +++--- test_runner/regress/test_remote_storage.py | 7 ++++++- test_runner/regress/test_tenant_detach.py | 2 +- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 162bf6b294..ddce82324c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -110,12 +110,11 @@ impl TenantState { Self::Active => Attached, // If the (initial or resumed) attach procedure fails, the tenant becomes Broken. // However, it also becomes Broken if the regular load fails. - // We would need a separate TenantState variant to distinguish these cases. - // However, there's no practical difference from Console's perspective. - // It will run a Postgres-level health check as soon as it observes Attached. - // That will fail on Broken tenants. - // Console can then rollback the attach, or, wait for operator to fix the Broken tenant. - Self::Broken { .. } => Attached, + // From Console's perspective there's no practical difference + // because attachment_status is polled by console only during attach operation execution. + Self::Broken { reason, .. } => Failed { + reason: reason.to_owned(), + }, // Why is Stopping a Maybe case? Because, during pageserver shutdown, // we set the Stopping state irrespective of whether the tenant // has finished attaching or not. @@ -312,10 +311,11 @@ impl std::ops::Deref for TenantAttachConfig { /// See [`TenantState::attachment_status`] and the OpenAPI docs for context. #[derive(Serialize, Deserialize, Clone)] -#[serde(rename_all = "snake_case")] +#[serde(tag = "slug", content = "data", rename_all = "snake_case")] pub enum TenantAttachmentStatus { Maybe, Attached, + Failed { reason: String }, } #[serde_as] @@ -809,7 +809,9 @@ mod tests { "slug": "Active", }, "current_physical_size": 42, - "attachment_status": "attached", + "attachment_status": { + "slug":"attached", + } }); let original_broken = TenantInfo { @@ -831,7 +833,9 @@ mod tests { } }, "current_physical_size": 42, - "attachment_status": "attached", + "attachment_status": { + "slug":"attached", + } }); assert_eq!( diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 0d912c95e0..1f8298ca3e 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -928,12 +928,28 @@ components: writing to the tenant's S3 state, so, DO NOT ATTACH the tenant to any other pageserver, or we risk split-brain. - `attached` means that the attach operation has completed, - maybe successfully, maybe not. Perform a health check at - the Postgres level to determine healthiness of the tenant. + successfully + - `failed` means that attach has failed. For reason check corresponding `reason` failed. + `failed` is the terminal state, retrying attach call wont resolve the issue. + For example this can be caused by s3 being unreachable. The retry may be implemented + with call to detach, though it would be better to not automate it and inspec failed state + manually before proceeding with a retry. See the tenant `/attach` endpoint for more information. - type: string - enum: [ "maybe", "attached" ] + type: object + required: + - slug + - data + properties: + slug: + type: string + enum: [ "maybe", "attached", "failed" ] + data: + - type: object + properties: + reason: + type: string + TenantCreateRequest: allOf: - $ref: '#/components/schemas/TenantConfig' diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index c558387413..d7ffa633fd 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,5 +1,5 @@ import time -from typing import Optional +from typing import Any, Dict, Optional from fixtures.log_helper import log from fixtures.pageserver.http import PageserverHttpClient @@ -72,7 +72,7 @@ def wait_until_tenant_state( expected_state: str, iterations: int, period: float = 1.0, -) -> bool: +) -> Dict[str, Any]: """ Does not use `wait_until` for debugging purposes """ @@ -81,7 +81,7 @@ def wait_until_tenant_state( tenant = pageserver_http.tenant_status(tenant_id=tenant_id) log.debug(f"Tenant {tenant_id} data: {tenant}") if tenant["state"]["slug"] == expected_state: - return True + return tenant except Exception as e: log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index baef8ecacc..742dbfff95 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -147,7 +147,12 @@ def test_remote_storage_backup_and_restore( # listing the remote timelines will fail because of the failpoint, # and the tenant will be marked as Broken. client.tenant_attach(tenant_id) - wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) + + tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) + assert tenant_info["attachment_status"] == { + "slug": "failed", + "data": {"reason": "storage-sync-list-remote-timelines"}, + } # Ensure that even though the tenant is broken, we can't attach it again. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"): diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 9d0fdcfaf8..2a015d5d17 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -532,7 +532,7 @@ def test_ignored_tenant_reattach( ): neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, - test_name="test_remote_storage_backup_and_restore", + test_name="test_ignored_tenant_reattach", ) env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() From 2e687bca5b4b541bdd483d6b0448367d547059c7 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 7 Jun 2023 11:28:18 -0400 Subject: [PATCH 9/9] refactor: use LayerDesc in layer map (part 1) (#4408) ## Problem part of https://github.com/neondatabase/neon/issues/4392 ## Summary of changes This PR adds a new HashMap that maps persistent layer desc to the layer object *inside* LayerMap. Originally I directly went towards adding such layer cache in Timeline, but the changes are too many and cannot be reviewed as a reasonably-sized PR. Therefore, we take this intermediate step to change part of the codebase to use persistent layer desc, and come up with other PRs to move this hash map of layer desc to the timeline struct. Also, file_size is now part of the layer desc. --------- Signed-off-by: Alex Chi Co-authored-by: bojanserafimov --- pageserver/benches/bench_layer_map.rs | 4 +- pageserver/src/tenant/layer_map.rs | 163 +++++++++++++----- pageserver/src/tenant/storage_layer.rs | 20 ++- .../src/tenant/storage_layer/delta_layer.rs | 16 +- .../src/tenant/storage_layer/image_layer.rs | 32 ++-- .../src/tenant/storage_layer/layer_desc.rs | 69 +++++++- .../src/tenant/storage_layer/remote_layer.rs | 6 +- pageserver/src/tenant/timeline.rs | 27 +-- 8 files changed, 247 insertions(+), 90 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index ee5980212e..45dc9fad4a 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) { is_incremental: false, short_id: format!("Layer {}", i), }; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 8d06ccd565..ca1a71b623 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,7 +51,9 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; +use anyhow::Context; use anyhow::Result; +use std::collections::HashMap; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; @@ -61,6 +63,8 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage; pub use historic_layer_coverage::Replacement; use super::storage_layer::range_eq; +use super::storage_layer::PersistentLayerDesc; +use super::storage_layer::PersistentLayerKey; /// /// LayerMap tracks what layers exist on a timeline. @@ -86,11 +90,16 @@ pub struct LayerMap { pub frozen_layers: VecDeque>, /// Index of the historic layers optimized for search - historic: BufferedHistoricLayerCoverage>, + historic: BufferedHistoricLayerCoverage>, /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. - l0_delta_layers: Vec>, + l0_delta_layers: Vec>, + + /// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and + /// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and + /// RemoteLayer will be removed. + mapping: HashMap>, } impl Default for LayerMap { @@ -101,6 +110,7 @@ impl Default for LayerMap { frozen_layers: VecDeque::default(), l0_delta_layers: Vec::default(), historic: BufferedHistoricLayerCoverage::default(), + mapping: HashMap::default(), } } } @@ -125,8 +135,9 @@ where /// /// Insert an on-disk layer. /// - pub fn insert_historic(&mut self, layer: Arc) { - self.layer_map.insert_historic_noflush(layer) + // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` + pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + self.layer_map.insert_historic_noflush(layer_desc, layer) } /// @@ -134,8 +145,8 @@ where /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: Arc) { - self.layer_map.remove_historic_noflush(layer) + pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + self.layer_map.remove_historic_noflush(layer_desc, layer) } /// Replaces existing layer iff it is the `expected`. @@ -150,12 +161,15 @@ where /// that we can replace values only by updating a hashmap. pub fn replace_historic( &mut self, + expected_desc: PersistentLayerDesc, expected: &Arc, + new_desc: PersistentLayerDesc, new: Arc, ) -> anyhow::Result>> { fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound)); - self.layer_map.replace_historic_noflush(expected, new) + self.layer_map + .replace_historic_noflush(expected_desc, expected, new_desc, new) } // We will flush on drop anyway, but this method makes it @@ -230,6 +244,7 @@ where (None, None) => None, (None, Some(image)) => { let lsn_floor = image.get_lsn_range().start; + let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor, @@ -237,6 +252,7 @@ where } (Some(delta), None) => { let lsn_floor = delta.get_lsn_range().start; + let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -247,6 +263,7 @@ where let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { + let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor: img_lsn, @@ -254,6 +271,7 @@ where } else { let lsn_floor = std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -273,16 +291,33 @@ where /// /// Helper function for BatchedUpdates::insert_historic /// - pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { + /// TODO(chi): remove L generic so that we do not need to pass layer object. + pub(self) fn insert_historic_noflush( + &mut self, + layer_desc: PersistentLayerDesc, + layer: Arc, + ) { + self.mapping.insert(layer_desc.key(), layer.clone()); + // TODO: See #3869, resulting #4088, attempted fix and repro #4094 - self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), - Arc::clone(&layer), - ); if Self::is_l0(&layer) { - self.l0_delta_layers.push(layer); + self.l0_delta_layers.push(layer_desc.clone().into()); } + + self.historic.insert( + historic_layer_coverage::LayerKey::from(&*layer), + layer_desc.into(), + ); + } + + fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc { + let layer = self + .mapping + .get(key) + .with_context(|| format!("{key:?}")) + .expect("inconsistent layer mapping"); + layer } /// @@ -290,14 +325,16 @@ where /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer: Arc) { + pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { self.historic .remove(historic_layer_coverage::LayerKey::from(&*layer)); - if Self::is_l0(&layer) { let len_before = self.l0_delta_layers.len(); - self.l0_delta_layers - .retain(|other| !Self::compare_arced_layers(other, &layer)); + let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); + l0_delta_layers.retain(|other| { + !Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer) + }); + self.l0_delta_layers = l0_delta_layers; // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // there's a chance that the comparison fails at runtime due to it comparing (pointer, // vtable) pairs. @@ -307,11 +344,14 @@ where "failed to locate removed historic layer from l0_delta_layers" ); } + self.mapping.remove(&layer_desc.key()); } pub(self) fn replace_historic_noflush( &mut self, + expected_desc: PersistentLayerDesc, expected: &Arc, + new_desc: PersistentLayerDesc, new: Arc, ) -> anyhow::Result>> { let key = historic_layer_coverage::LayerKey::from(&**expected); @@ -332,10 +372,9 @@ where let l0_index = if expected_l0 { // find the index in case replace worked, we need to replace that as well - let pos = self - .l0_delta_layers - .iter() - .position(|slot| Self::compare_arced_layers(slot, expected)); + let pos = self.l0_delta_layers.iter().position(|slot| { + Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected) + }); if pos.is_none() { return Ok(Replacement::NotFound); @@ -345,16 +384,28 @@ where None }; - let replaced = self.historic.replace(&key, new.clone(), |existing| { - Self::compare_arced_layers(existing, expected) + let new_desc = Arc::new(new_desc); + let replaced = self.historic.replace(&key, new_desc.clone(), |existing| { + **existing == expected_desc }); if let Replacement::Replaced { .. } = &replaced { + self.mapping.remove(&expected_desc.key()); + self.mapping.insert(new_desc.key(), new); if let Some(index) = l0_index { - self.l0_delta_layers[index] = new; + self.l0_delta_layers[index] = new_desc; } } + let replaced = match replaced { + Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered }, + Replacement::NotFound => Replacement::NotFound, + Replacement::RemovalBuffered => Replacement::RemovalBuffered, + Replacement::Unexpected(x) => { + Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone()) + } + }; + Ok(replaced) } @@ -383,7 +434,7 @@ where let start = key.start.to_i128(); let end = key.end.to_i128(); - let layer_covers = |layer: Option>| match layer { + let layer_covers = |layer: Option>| match layer { Some(layer) => layer.get_lsn_range().start >= lsn.start, None => false, }; @@ -404,7 +455,9 @@ where } pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic.iter() + self.historic + .iter() + .map(|x| self.get_layer_from_mapping(&x.key()).clone()) } /// @@ -436,14 +489,24 @@ where // Loop through the change events and push intervals for (change_key, change_val) in version.image_coverage.range(start..end) { let kr = Key::from_i128(current_key)..Key::from_i128(change_key); - coverage.push((kr, current_val.take())); + coverage.push(( + kr, + current_val + .take() + .map(|l| self.get_layer_from_mapping(&l.key()).clone()), + )); current_key = change_key; current_val = change_val.clone(); } // Add the final interval let kr = Key::from_i128(current_key)..Key::from_i128(end); - coverage.push((kr, current_val.take())); + coverage.push(( + kr, + current_val + .take() + .map(|l| self.get_layer_from_mapping(&l.key()).clone()), + )); Ok(coverage) } @@ -532,7 +595,9 @@ where let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = + Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) + as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; @@ -555,7 +620,9 @@ where let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = + Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) + as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; max_stacked_deltas = std::cmp::max( @@ -706,7 +773,11 @@ where /// Return all L0 delta layers pub fn get_level0_deltas(&self) -> Result>> { - Ok(self.l0_delta_layers.clone()) + Ok(self + .l0_delta_layers + .iter() + .map(|x| self.get_layer_from_mapping(&x.key()).clone()) + .collect()) } /// debugging function to print out the contents of the layer map @@ -809,12 +880,17 @@ mod tests { let layer = LayerDescriptor::from(layer); // same skeletan construction; see scenario below - let not_found: Arc = Arc::new(layer.clone()); - let new_version: Arc = Arc::new(layer); + let not_found = Arc::new(layer.clone()); + let new_version = Arc::new(layer); let mut map = LayerMap::default(); - let res = map.batch_update().replace_historic(¬_found, new_version); + let res = map.batch_update().replace_historic( + not_found.get_persistent_layer_desc(), + ¬_found, + new_version.get_persistent_layer_desc(), + new_version, + ); assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}"); } @@ -823,8 +899,8 @@ mod tests { let name = LayerFileName::from_str(layer_name).unwrap(); let skeleton = LayerDescriptor::from(name); - let remote: Arc = Arc::new(skeleton.clone()); - let downloaded: Arc = Arc::new(skeleton); + let remote = Arc::new(skeleton.clone()); + let downloaded = Arc::new(skeleton); let mut map = LayerMap::default(); @@ -834,12 +910,18 @@ mod tests { let expected_in_counts = (1, usize::from(expected_l0)); - map.batch_update().insert_historic(remote.clone()); + map.batch_update() + .insert_historic(remote.get_persistent_layer_desc(), remote.clone()); assert_eq!(count_layer_in(&map, &remote), expected_in_counts); let replaced = map .batch_update() - .replace_historic(&remote, downloaded.clone()) + .replace_historic( + remote.get_persistent_layer_desc(), + &remote, + downloaded.get_persistent_layer_desc(), + downloaded.clone(), + ) .expect("name derived attributes are the same"); assert!( matches!(replaced, Replacement::Replaced { .. }), @@ -847,11 +929,12 @@ mod tests { ); assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts); - map.batch_update().remove_historic(downloaded.clone()); + map.batch_update() + .remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); } - fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { + fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { let historic = map .iter_historic_layers() .filter(|x| LayerMap::compare_arced_layers(x, layer)) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7c071463de..6ac4fd9470 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -38,7 +38,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter}; pub use filename::{DeltaFileName, ImageFileName, LayerFileName}; pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; -pub use layer_desc::PersistentLayerDesc; +pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; use super::layer_map::BatchedUpdates; @@ -454,7 +454,9 @@ pub trait PersistentLayer: Layer { /// /// Should not change over the lifetime of the layer object because /// current_physical_size is computed as the som of this value. - fn file_size(&self) -> u64; + fn file_size(&self) -> u64 { + self.layer_desc().file_size + } fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo; @@ -483,6 +485,20 @@ pub struct LayerDescriptor { pub short_id: String, } +impl LayerDescriptor { + /// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, + /// and the tenant / timeline id does not matter. + pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { + PersistentLayerDesc::new_delta( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + self.key.clone(), + self.lsn.clone(), + 233, + ) + } +} + impl Layer for LayerDescriptor { fn get_key_range(&self) -> Range { self.key.clone() diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5f2fb1ebea..624fe8dac4 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -182,8 +182,6 @@ pub struct DeltaLayer { pub desc: PersistentLayerDesc, - pub file_size: u64, - access_stats: LayerAccessStats, inner: RwLock, @@ -196,7 +194,7 @@ impl std::fmt::Debug for DeltaLayer { f.debug_struct("DeltaLayer") .field("key_range", &RangeDisplayDebug(&self.desc.key_range)) .field("lsn_range", &self.desc.lsn_range) - .field("file_size", &self.file_size) + .field("file_size", &self.desc.file_size) .field("inner", &self.inner) .finish() } @@ -439,10 +437,6 @@ impl PersistentLayer for DeltaLayer { Ok(()) } - fn file_size(&self) -> u64 { - self.file_size - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -451,7 +445,7 @@ impl PersistentLayer for DeltaLayer { HistoricLayerInfo::Delta { layer_file_name, - layer_file_size: self.file_size, + layer_file_size: self.desc.file_size, lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: false, @@ -602,8 +596,8 @@ impl DeltaLayer { timeline_id, filename.key_range.clone(), filename.lsn_range.clone(), + file_size, ), - file_size, access_stats, inner: RwLock::new(DeltaLayerInner { loaded: false, @@ -634,8 +628,8 @@ impl DeltaLayer { summary.timeline_id, summary.key_range, summary.lsn_range, + metadata.len(), ), - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, @@ -803,8 +797,8 @@ impl DeltaLayerWriterInner { self.timeline_id, self.key_start..key_end, self.lsn_range.clone(), + metadata.len(), ), - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b55dd08a6d..07a16a7de2 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -109,8 +109,6 @@ pub struct ImageLayer { // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn pub lsn: Lsn, - pub file_size: u64, - access_stats: LayerAccessStats, inner: RwLock, @@ -122,7 +120,7 @@ impl std::fmt::Debug for ImageLayer { f.debug_struct("ImageLayer") .field("key_range", &RangeDisplayDebug(&self.desc.key_range)) - .field("file_size", &self.file_size) + .field("file_size", &self.desc.file_size) .field("lsn", &self.lsn) .field("inner", &self.inner) .finish() @@ -258,17 +256,13 @@ impl PersistentLayer for ImageLayer { Ok(()) } - fn file_size(&self) -> u64 { - self.file_size - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); HistoricLayerInfo::Image { layer_file_name, - layer_file_size: self.file_size, + layer_file_size: self.desc.file_size, lsn_start: lsn_range.start, remote: false, access_stats: self.access_stats.as_api_model(reset), @@ -411,9 +405,9 @@ impl ImageLayer { filename.key_range.clone(), filename.lsn, false, + file_size, ), // Now we assume image layer ALWAYS covers the full range. This may change in the future. lsn: filename.lsn, - file_size, access_stats, inner: RwLock::new(ImageLayerInner { loaded: false, @@ -443,9 +437,9 @@ impl ImageLayer { summary.key_range, summary.lsn, false, + metadata.len(), ), // Now we assume image layer ALWAYS covers the full range. This may change in the future. lsn: summary.lsn, - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { file: None, @@ -578,14 +572,6 @@ impl ImageLayerWriterInner { file.write_all(buf.as_ref())?; } - let desc = PersistentLayerDesc::new_img( - self.tenant_id, - self.timeline_id, - self.key_range.clone(), - self.lsn, - self.is_incremental, // for now, image layer ALWAYS covers the full range - ); - // Fill in the summary on blk 0 let summary = Summary { magic: IMAGE_FILE_MAGIC, @@ -604,6 +590,15 @@ impl ImageLayerWriterInner { .metadata() .context("get metadata to determine file size")?; + let desc = PersistentLayerDesc::new_img( + self.tenant_id, + self.timeline_id, + self.key_range.clone(), + self.lsn, + self.is_incremental, // for now, image layer ALWAYS covers the full range + metadata.len(), + ); + // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -611,7 +606,6 @@ impl ImageLayerWriterInner { path_or_conf: PathOrConf::Conf(self.conf), desc, lsn: self.lsn, - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { loaded: false, diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index a9859681d3..d1cef70253 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -1,10 +1,11 @@ +use anyhow::Result; use std::ops::Range; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, }; -use crate::repository::Key; +use crate::{context::RequestContext, repository::Key}; use super::{DeltaFileName, ImageFileName, LayerFileName}; @@ -24,9 +25,27 @@ pub struct PersistentLayerDesc { /// always be equal to `is_delta`. If we land the partial image layer PR someday, image layer could also be /// incremental. pub is_incremental: bool, + /// File size + pub file_size: u64, +} + +/// A unique identifier of a persistent layer within the context of one timeline. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct PersistentLayerKey { + pub key_range: Range, + pub lsn_range: Range, + pub is_delta: bool, } impl PersistentLayerDesc { + pub fn key(&self) -> PersistentLayerKey { + PersistentLayerKey { + key_range: self.key_range.clone(), + lsn_range: self.lsn_range.clone(), + is_delta: self.is_delta, + } + } + pub fn short_id(&self) -> String { self.filename().file_name() } @@ -37,6 +56,7 @@ impl PersistentLayerDesc { key_range: Range, lsn: Lsn, is_incremental: bool, + file_size: u64, ) -> Self { Self { tenant_id, @@ -45,6 +65,7 @@ impl PersistentLayerDesc { lsn_range: Self::image_layer_lsn_range(lsn), is_delta: false, is_incremental, + file_size, } } @@ -53,6 +74,7 @@ impl PersistentLayerDesc { timeline_id: TimelineId, key_range: Range, lsn_range: Range, + file_size: u64, ) -> Self { Self { tenant_id, @@ -61,6 +83,7 @@ impl PersistentLayerDesc { lsn_range, is_delta: true, is_incremental: true, + file_size, } } @@ -106,4 +129,48 @@ impl PersistentLayerDesc { self.image_file_name().into() } } + + // TODO: remove this in the future once we refactor timeline APIs. + + pub fn get_lsn_range(&self) -> Range { + self.lsn_range.clone() + } + + pub fn get_key_range(&self) -> Range { + self.key_range.clone() + } + + pub fn get_timeline_id(&self) -> TimelineId { + self.timeline_id + } + + pub fn get_tenant_id(&self) -> TenantId { + self.tenant_id + } + + pub fn is_incremental(&self) -> bool { + self.is_incremental + } + + pub fn is_delta(&self) -> bool { + self.is_delta + } + + pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { + println!( + "----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----", + self.tenant_id, + self.timeline_id, + self.key_range.start, + self.key_range.end, + self.lsn_range.start, + self.lsn_range.end + ); + + Ok(()) + } + + pub fn file_size(&self) -> u64 { + self.file_size + } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index ff0f44da92..387bae5b1f 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -142,10 +142,6 @@ impl PersistentLayer for RemoteLayer { true } - fn file_size(&self) -> u64 { - self.layer_metadata.file_size() - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -190,6 +186,7 @@ impl RemoteLayer { fname.key_range.clone(), fname.lsn, false, + layer_metadata.file_size(), ), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), @@ -211,6 +208,7 @@ impl RemoteLayer { timelineid, fname.key_range.clone(), fname.lsn_range.clone(), + layer_metadata.file_size(), ), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 507f0de4f3..2a50a26a23 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1211,7 +1211,12 @@ impl Timeline { ), }); - let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? { + let replaced = match batch_updates.replace_historic( + local_layer.layer_desc().clone(), + local_layer, + new_remote_layer.layer_desc().clone(), + new_remote_layer, + )? { Replacement::Replaced { .. } => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); @@ -1607,7 +1612,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1639,7 +1644,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1738,7 +1743,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer); + updates.remove_historic(local_layer.layer_desc().clone(), local_layer); // fall-through to adding the remote layer } } else { @@ -1777,7 +1782,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1804,7 +1809,7 @@ impl Timeline { ), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); } } } @@ -2252,7 +2257,7 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer); + updates.remove_historic(layer.layer_desc().clone(), layer); Ok(()) } @@ -2962,7 +2967,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - batch_updates.insert_historic(l); + batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); // update the timeline's physical size @@ -3210,7 +3215,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(l); + updates.insert_historic(l.layer_desc().clone(), l); } updates.flush(); drop(layers); @@ -3657,7 +3662,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(x); + updates.insert_historic(x.layer_desc().clone(), x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -4192,7 +4197,7 @@ impl Timeline { { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); - let failure = match updates.replace_historic(&l, new_layer) { + let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { Ok(Replacement::Replaced { .. }) => false, Ok(Replacement::NotFound) => { // TODO: the downloaded file should probably be removed, otherwise