From 1ebe92bcf9b30495d9484eb357bc12ca6dfc981a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 24 May 2023 17:53:12 +0200 Subject: [PATCH] make Tenant::timelines a tokio::sync::RwLock This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). The patch converts `Tenant::timelines` from `std::sync::Mutex` to `tokio::sync::Mutex`. We need this change because we want to switch `Timeline::layers` to an async RwLock. We need that because we hold `Timeline::layers` while calling `Layer::get_value_reconstruct_data`. So, if we want to make get_value_reconstruct_data async, we need to make `Timeline::layers` async first. --- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 6 +- pageserver/src/page_service.rs | 8 +- pageserver/src/pgdatadir_mapping.rs | 8 +- pageserver/src/tenant.rs | 185 +++++++++++------- pageserver/src/tenant/mgr.rs | 3 +- .../src/tenant/remote_timeline_client.rs | 7 +- pageserver/src/tenant/size.rs | 2 +- .../walreceiver/connection_manager.rs | 1 + pageserver/src/walingest.rs | 8 +- 11 files changed, 149 insertions(+), 83 deletions(-) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index ca7b9650e8..8fd1d55501 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration( let mut tenant_resident_size = 0; // iterate through list of timelines in tenant - for timeline in tenant.list_timelines().iter() { + for timeline in tenant.list_timelines().await.iter() { // collect per-timeline metrics only for active timelines if timeline.is_active() { let timeline_written_size = u64::from(timeline.get_last_record_lsn()); diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18e..7f8691d81e 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -508,7 +508,7 @@ async fn collect_eviction_candidates( // a little unfair to tenants during shutdown in such a situation is tolerable. let mut tenant_candidates = Vec::new(); let mut max_layer_size = 0; - for tl in tenant.list_timelines() { + for tl in tenant.list_timelines().await { if !tl.is_active() { continue; } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 59105f122c..e944d6b8ff 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -312,7 +312,7 @@ async fn timeline_list_handler(request: Request) -> Result, let response_data = async { let tenant = mgr::get_tenant(tenant_id, true).await?; - let timelines = tenant.list_timelines(); + let timelines = tenant.list_timelines().await; let mut response_data = Vec::with_capacity(timelines.len()); for timeline in timelines { @@ -351,6 +351,7 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result, ApiErro // Calculate total physical size of all timelines let mut current_physical_size = 0; - for timeline in tenant.list_timelines().iter() { + for timeline in tenant.list_timelines().await.iter() { current_physical_size += timeline.layer_size_sum(); } @@ -975,6 +976,7 @@ async fn active_timeline_of_active_tenant( let tenant = mgr::get_tenant(tenant_id, true).await?; tenant .get_timeline(timeline_id, true) + .await .map_err(ApiError::NotFound) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 886e0b2c35..03799553a0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -388,7 +388,7 @@ impl PageServerHandler { }; // Check that the timeline exists - let timeline = tenant.get_timeline(timeline_id, true)?; + let timeline = tenant.get_timeline(timeline_id, true).await?; // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; @@ -487,7 +487,9 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; - let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; + let timeline = tenant + .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) + .await?; // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute @@ -1201,6 +1203,6 @@ async fn get_active_tenant_timeline( ctx: &RequestContext, ) -> Result, GetActiveTenantError> { let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?; - let timeline = tenant.get_timeline(timeline_id, true)?; + let timeline = tenant.get_timeline(timeline_id, true).await?; Ok(timeline) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 186209dfcf..7857efd03c 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1594,13 +1594,15 @@ fn is_slru_block_key(key: Key) -> bool { } #[cfg(test)] -pub fn create_test_timeline( +pub async fn create_test_timeline( tenant: &crate::tenant::Tenant, timeline_id: utils::id::TimelineId, pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?; + let tline = tenant + .create_test_timeline(timeline_id, Lsn(8), pg_version, ctx) + .await?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit()?; @@ -1630,7 +1632,7 @@ mod tests { #[test] fn test_list_rels_drop() -> Result<()> { let repo = RepoHarness::create("test_list_rels_drop")?.load(); - let tline = create_empty_timeline(repo, TIMELINE_ID)?; + let tline = create_empty_timeline(repo, TIMELINE_ID).await?; const TESTDB: u32 = 111; // Import initial dummy checkpoint record, otherwise the get_timeline() call diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c713bc0ca2..15c4207018 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -40,8 +40,7 @@ use std::process::Stdio; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::MutexGuard; -use std::sync::{Mutex, RwLock}; +use std::sync::RwLock; use std::time::{Duration, Instant}; use self::config::TenantConf; @@ -137,7 +136,7 @@ pub struct Tenant { tenant_conf: Arc>, tenant_id: TenantId, - timelines: Mutex>>, + timelines: tokio::sync::Mutex>>, // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding // `timelines` mutex during all GC iteration @@ -188,7 +187,7 @@ impl UninitializedTimeline<'_> { /// This function launches the flush loop if not already done. /// /// The caller is responsible for activating the timeline (function `.activate()`). - fn initialize_with_lock( + async fn initialize_with_lock( mut self, _ctx: &RequestContext, timelines: &mut HashMap>, @@ -264,8 +263,10 @@ impl UninitializedTimeline<'_> { // Initialize without loading the layer map. We started with an empty layer map, and already // updated it for the layers that we created during the import. - let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - let tl = self.initialize_with_lock(ctx, &mut timelines, false)?; + let mut timelines = self.owning_tenant.timelines.lock().await; + let tl = self + .initialize_with_lock(ctx, &mut timelines, false) + .await?; tl.activate(broker_client, ctx); Ok(tl) } @@ -485,7 +486,7 @@ impl Tenant { let timeline = { // avoiding holding it across awaits - let mut timelines_accessor = self.timelines.lock().unwrap(); + let mut timelines_accessor = self.timelines.lock().await; if timelines_accessor.contains_key(&timeline_id) { anyhow::bail!( "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" @@ -507,7 +508,10 @@ impl Tenant { // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver // will ingest data which may require looking at the layers which are not yet available locally - match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) { + match timeline + .initialize_with_lock(ctx, &mut timelines_accessor, true) + .await + { Ok(new_timeline) => new_timeline, Err(e) => { error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); @@ -619,7 +623,7 @@ impl Tenant { async move { let doit = async { tenant_clone.attach(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; + tenant_clone.activate(broker_client, &ctx).await?; anyhow::Ok(()) }; match doit.await { @@ -764,7 +768,7 @@ impl Tenant { pub async fn get_remote_size(&self) -> anyhow::Result { let mut size = 0; - for timeline in self.list_timelines().iter() { + for timeline in self.list_timelines().await.iter() { if let Some(remote_client) = &timeline.remote_client { size += remote_client.get_remote_physical_size(); } @@ -790,7 +794,7 @@ impl Tenant { .context("Failed to create new timeline directory")?; let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else( || { anyhow::anyhow!( @@ -894,7 +898,7 @@ impl Tenant { async move { let doit = async { tenant_clone.load(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; + tenant_clone.activate(broker_client, &ctx).await?; anyhow::Ok(()) }; match doit.await { @@ -1109,7 +1113,7 @@ impl Tenant { }; let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { - let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) + let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false).await .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; Some(ancestor_timeline) } else { @@ -1134,12 +1138,12 @@ impl Tenant { /// Get Timeline handle for given Neon timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline( + pub async fn get_timeline( &self, timeline_id: TimelineId, active_only: bool, ) -> anyhow::Result> { - let timelines_accessor = self.timelines.lock().unwrap(); + let timelines_accessor = self.timelines.lock().await; let timeline = timelines_accessor.get(&timeline_id).with_context(|| { format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) })?; @@ -1158,10 +1162,10 @@ impl Tenant { /// Lists timelines the tenant contains. /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use. - pub fn list_timelines(&self) -> Vec> { + pub async fn list_timelines(&self) -> Vec> { self.timelines .lock() - .unwrap() + .await .values() .map(Arc::clone) .collect() @@ -1170,7 +1174,7 @@ impl Tenant { /// This is used to create the initial 'main' timeline during bootstrapping, /// or when importing a new base backup. The caller is expected to load an /// initial image of the datadir to the new timeline after this. - pub fn create_empty_timeline( + pub async fn create_empty_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, @@ -1182,7 +1186,7 @@ impl Tenant { "Cannot create empty timelines on inactive tenant" ); - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; drop(timelines); @@ -1210,16 +1214,20 @@ impl Tenant { // This makes the various functions which anyhow::ensure! for Active state work in tests. // Our current tests don't need the background loops. #[cfg(test)] - pub fn create_test_timeline( + pub async fn create_test_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; - let mut timelines = self.timelines.lock().unwrap(); - let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?; + let uninit_tl = self + .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) + .await?; + let mut timelines = self.timelines.lock().await; + let tl = uninit_tl + .initialize_with_lock(ctx, &mut timelines, true) + .await?; // The non-test code would call tl.activate() here. tl.set_state(TimelineState::Active); Ok(tl) @@ -1246,7 +1254,7 @@ impl Tenant { "Cannot create timelines on inactive tenant" ); - if let Ok(existing) = self.get_timeline(new_timeline_id, false) { + if let Ok(existing) = self.get_timeline(new_timeline_id, false).await { debug!("timeline {new_timeline_id} already exists"); if let Some(remote_client) = existing.remote_client.as_ref() { @@ -1271,6 +1279,7 @@ impl Tenant { Some(ancestor_timeline_id) => { let ancestor_timeline = self .get_timeline(ancestor_timeline_id, false) + .await .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { @@ -1365,7 +1374,7 @@ impl Tenant { // compactions. We don't want to block everything else while the // compaction runs. let timelines_to_compact = { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; let timelines_to_compact = timelines .iter() .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) @@ -1394,7 +1403,7 @@ impl Tenant { // flushing. We don't want to block everything else while the // flushing is performed. let timelines_to_flush = { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; timelines .iter() .map(|(_id, timeline)| Arc::clone(timeline)) @@ -1419,7 +1428,7 @@ impl Tenant { // Transition the timeline into TimelineState::Stopping. // This should prevent new operations from starting. let timeline = { - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock().await; // Ensure that there are no child timelines **attached to that pageserver**, // because detach removes files, which will break child branches @@ -1569,7 +1578,7 @@ impl Tenant { }); // Remove the timeline from the map. - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock().await; let children_exist = timelines .iter() .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); @@ -1613,7 +1622,7 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate( + async fn activate( &self, broker_client: BrokerClientChannel, ctx: &RequestContext, @@ -1658,7 +1667,7 @@ impl Tenant { } if activating { - let timelines_accessor = self.timelines.lock().unwrap(); + let timelines_accessor = self.timelines.lock().await; let not_broken_timelines = timelines_accessor .values() .filter(|timeline| timeline.current_state() != TimelineState::Broken); @@ -1738,7 +1747,7 @@ impl Tenant { }); if stopping { - let timelines_accessor = self.timelines.lock().unwrap(); + let timelines_accessor = self.timelines.lock().await; let not_broken_timelines = timelines_accessor .values() .filter(|timeline| timeline.current_state() != TimelineState::Broken); @@ -1947,12 +1956,12 @@ impl Tenant { .or(self.conf.default_tenant_conf.min_resident_size_override) } - pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { + pub async fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { *self.tenant_conf.write().unwrap() = new_tenant_conf; // Don't hold self.timelines.lock() during the notifies. // There's no risk of deadlock right now, but there could be if we consolidate // mutexes in struct Timeline in the future. - let timelines = self.list_timelines(); + let timelines = self.list_timelines().await; for timeline in timelines { timeline.tenant_conf_updated(); } @@ -2030,7 +2039,7 @@ impl Tenant { // activation times. loading_started_at: Instant::now(), tenant_conf: Arc::new(RwLock::new(tenant_conf)), - timelines: Mutex::new(HashMap::new()), + timelines: tokio::sync::Mutex::new(HashMap::new()), gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, remote_storage, @@ -2259,7 +2268,7 @@ impl Tenant { // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; let mut all_branchpoints = BTreeSet::new(); let timeline_ids = { if let Some(target_timeline_id) = target_timeline_id.as_ref() { @@ -2306,6 +2315,7 @@ impl Tenant { // Timeline is known to be local and loaded. let timeline = self .get_timeline(timeline_id, false) + .await .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines @@ -2391,7 +2401,7 @@ impl Tenant { // Create a placeholder for the new branch. This will error // out if the new timeline ID is already in use. let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; self.create_timeline_uninit_mark(dst_id, &timelines)? }; @@ -2458,7 +2468,7 @@ impl Tenant { ); let new_timeline = { - let mut timelines = self.timelines.lock().unwrap(); + let mut timelines = self.timelines.lock().await; self.prepare_timeline( dst_id, &metadata, @@ -2466,7 +2476,8 @@ impl Tenant { false, Some(Arc::clone(src_timeline)), )? - .initialize_with_lock(ctx, &mut timelines, true)? + .initialize_with_lock(ctx, &mut timelines, true) + .await? }; // Root timeline gets its layers during creation and uploads them along with the metadata. @@ -2496,7 +2507,7 @@ impl Tenant { ctx: &RequestContext, ) -> anyhow::Result> { let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); + let timelines = self.timelines.lock().await; self.create_timeline_uninit_mark(timeline_id, &timelines)? }; // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` @@ -2582,8 +2593,10 @@ impl Tenant { // Initialize the timeline without loading the layer map, because we already updated the layer // map above, when we imported the datadir. let timeline = { - let mut timelines = self.timelines.lock().unwrap(); - raw_timeline.initialize_with_lock(ctx, &mut timelines, false)? + let mut timelines = self.timelines.lock().await; + raw_timeline + .initialize_with_lock(ctx, &mut timelines, false) + .await? }; info!( @@ -2685,7 +2698,7 @@ impl Tenant { fn create_timeline_uninit_mark( &self, timeline_id: TimelineId, - timelines: &MutexGuard>>, + timelines: &tokio::sync::MutexGuard>>, ) -> anyhow::Result { let tenant_id = self.tenant_id; @@ -3229,7 +3242,7 @@ pub mod harness { .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; tenant.state.send_replace(TenantState::Active); - for timeline in tenant.timelines.lock().unwrap().values() { + for timeline in tenant.timelines.lock().await.values() { timeline.set_state(TimelineState::Active); } Ok(tenant) @@ -3289,7 +3302,9 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -3322,9 +3337,14 @@ mod tests { let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")? .load() .await; - let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let _ = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; - match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) { + match tenant + .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await + { Ok(_) => panic!("duplicate timeline creation should fail"), Err(e) => assert_eq!( e.to_string(), @@ -3353,7 +3373,9 @@ mod tests { use std::str::from_utf8; let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer(); #[allow(non_snake_case)] @@ -3379,6 +3401,7 @@ mod tests { .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -3450,7 +3473,9 @@ mod tests { TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 @@ -3487,8 +3512,9 @@ mod tests { .load() .await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx) + .await?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 match tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) @@ -3517,7 +3543,7 @@ mod tests { RepoHarness::create("test_prohibit_get_for_garbage_collected_data")? .load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; + let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION).await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?; @@ -3537,7 +3563,9 @@ mod tests { TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3545,6 +3573,7 @@ mod tests { .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; @@ -3585,7 +3614,9 @@ mod tests { TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3593,6 +3624,7 @@ mod tests { .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant @@ -3608,7 +3640,9 @@ mod tests { TenantHarness::create("test_parent_keeps_data_forever_after_branching")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3616,6 +3650,7 @@ mod tests { .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; @@ -3640,14 +3675,16 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; { let (tenant, ctx) = harness.load().await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x8000)).await?; } let (tenant, _ctx) = harness.load().await; tenant .get_timeline(TIMELINE_ID, true) + .await .expect("cannot load timeline"); Ok(()) @@ -3660,8 +3697,9 @@ mod tests { // create two timelines { let (tenant, ctx) = harness.load().await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; @@ -3672,6 +3710,7 @@ mod tests { let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; @@ -3683,10 +3722,12 @@ mod tests { // check that both, child and ancestor are loaded let _child_tline = tenant .get_timeline(NEW_TIMELINE_ID, true) + .await .expect("cannot get child timeline loaded"); let _ancestor_tline = tenant .get_timeline(TIMELINE_ID, true) + .await .expect("cannot get ancestor timeline loaded"); Ok(()) @@ -3698,7 +3739,9 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; let (tenant, ctx) = harness.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; drop(tline); drop(tenant); @@ -3736,7 +3779,9 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_images")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -3801,7 +3846,9 @@ mod tests { #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let mut lsn = Lsn(0x10); @@ -3843,7 +3890,9 @@ mod tests { #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 1000; @@ -3916,8 +3965,9 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_branches")? .load() .await; - let mut tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let mut tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 1000; @@ -3954,6 +4004,7 @@ mod tests { .await?; tline = tenant .get_timeline(new_tline_id, true) + .await .expect("Should have the branched timeline"); for _ in 0..NUM_KEYS { @@ -3999,8 +4050,9 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")? .load() .await; - let mut tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let mut tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 100; const NUM_TLINES: usize = 50; @@ -4019,6 +4071,7 @@ mod tests { .await?; tline = tenant .get_timeline(new_tline_id, true) + .await .expect("Should have the branched timeline"); for _ in 0..NUM_KEYS { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a96364ba78..b3be6061b3 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -319,7 +319,7 @@ pub async fn set_new_tenant_config( new_tenant_conf, false, )?; - tenant.set_new_tenant_config(new_tenant_conf); + tenant.set_new_tenant_config(new_tenant_conf).await; Ok(()) } @@ -686,6 +686,7 @@ pub async fn immediate_compact( let timeline = tenant .get_timeline(timeline_id, true) + .await .map_err(ApiError::NotFound)?; // Run in task_mgr to avoid race with tenant_detach operation diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c4640307d0..121a4227b9 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1264,7 +1264,12 @@ mod tests { let harness = TenantHarness::create(test_name)?; let (tenant, ctx) = runtime.block_on(harness.load()); // create an empty timeline directory - let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let _ = runtime.block_on(tenant.create_test_timeline( + TIMELINE_ID, + Lsn(0), + DEFAULT_PG_VERSION, + &ctx, + ))?; let remote_fs_dir = harness.conf.workdir.join("remote_fs"); std::fs::create_dir_all(remote_fs_dir)?; diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index ffcbdc1f1d..d243137281 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -136,7 +136,7 @@ pub(super) async fn gather_inputs( .context("Failed to refresh gc_info before gathering inputs")?; // Collect information about all the timelines - let mut timelines = tenant.list_timelines(); + let mut timelines = tenant.list_timelines().await; if timelines.is_empty() { // perhaps the tenant has just been created, and as such doesn't have any data yet diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd42..c25eea1b70 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1309,6 +1309,7 @@ mod tests { let (tenant, ctx) = harness.load().await; let timeline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx) + .await .expect("Failed to create an empty timeline for dummy wal connection manager"); ConnectionManagerState { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4b8e6aa515..eb74f75846 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1209,7 +1209,7 @@ mod tests { #[tokio::test] async fn test_relsize() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1428,7 +1428,7 @@ mod tests { #[tokio::test] async fn test_drop_extend() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1497,7 +1497,7 @@ mod tests { #[tokio::test] async fn test_truncate_extend() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; // Create a 20 MB relation (the size is arbitrary) @@ -1637,7 +1637,7 @@ mod tests { #[tokio::test] async fn test_large_rel() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut lsn = 0x10;