From 218af17c2949dde4d35c8b29045ca543249887e5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 18:49:39 +0200 Subject: [PATCH] turn Timeline::layers into tokio::sync::RwLock (cherry picked from commit 2001c31a14e723c25afef3afe00ba0111eb708a9) --- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 11 +- pageserver/src/page_service.rs | 4 +- pageserver/src/pgdatadir_mapping.rs | 10 +- pageserver/src/tenant.rs | 284 +++++++++++------- .../src/tenant/remote_timeline_client.rs | 7 +- .../tenant/storage_layer/inmemory_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 96 +++--- .../src/tenant/timeline/eviction_task.rs | 2 +- .../walreceiver/connection_manager.rs | 1 + 10 files changed, 258 insertions(+), 161 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18e..6372178b2d 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -512,7 +512,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ed0021d744..a4428b9f9b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -212,7 +212,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -230,7 +230,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -261,7 +261,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -551,7 +552,7 @@ async fn tenant_status(request: Request) -> Result, ApiErro // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request) -> Result check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9e9285a009..bf6fd1eeb1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -489,7 +489,9 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; - let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; + let timeline = tenant + .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) + .await?; // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 188b5d15ce..b0355f86c5 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1118,7 +1118,7 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; - let mut layer_map = self.tline.layers.write().unwrap(); + let mut layer_map = self.tline.layers.write().await; // Flush relation and SLRU data blocks, keep metadata. let mut result: anyhow::Result<()> = Ok(()); @@ -1152,10 +1152,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); @@ -1602,7 +1602,9 @@ pub async fn create_test_timeline( pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?; + let tline = tenant + .create_test_timeline(timeline_id, Lsn(8), pg_version, ctx) + .await?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit().await?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8aac7d869a..3367b2a4cb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -505,6 +505,7 @@ impl Tenant { // "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized"); timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; @@ -544,7 +545,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -1166,7 +1167,7 @@ impl Tenant { /// This is used to create the initial 'main' timeline during bootstrapping, /// or when importing a new base backup. The caller is expected to load an /// initial image of the datadir to the new timeline after this. - pub fn create_empty_timeline( + pub async fn create_empty_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, @@ -1178,9 +1179,10 @@ impl Tenant { "Cannot create empty timelines on inactive tenant" ); - let timelines = self.timelines.lock().unwrap(); - let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; - drop(timelines); + let timeline_uninit_mark = { + let timelines = self.timelines.lock().unwrap(); + self.create_timeline_uninit_mark(new_timeline_id, &timelines)? + }; let new_metadata = TimelineMetadata::new( Lsn(0), @@ -1198,6 +1200,7 @@ impl Tenant { true, None, ) + .await } /// Helper for unit tests to create an emtpy timeline. @@ -1206,19 +1209,22 @@ impl Tenant { // This makes the various functions which anyhow::ensure! for Active state work in tests. // Our current tests don't need the background loops. #[cfg(test)] - pub fn create_test_timeline( + pub async fn create_test_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; + let uninit_tl = self + .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) + .await?; let raw_timeline = uninit_tl.raw_timeline().unwrap(); raw_timeline .load_layer_map(raw_timeline.get_disk_consistent_lsn()) + .await .with_context(|| { format!( "Failed to load layermap for timeline {tenant_id}/{timeline_id}", @@ -2481,16 +2487,19 @@ impl Tenant { src_timeline.pg_version, ); - let uninit_timeline = self.prepare_timeline( - dst_id, - &metadata, - timeline_uninit_mark, - false, - Some(Arc::clone(src_timeline)), - )?; + let uninit_timeline = self + .prepare_timeline( + dst_id, + &metadata, + timeline_uninit_mark, + false, + Some(Arc::clone(src_timeline)), + ) + .await?; let raw_timeline = uninit_timeline.raw_timeline()?; raw_timeline .load_layer_map(raw_timeline.get_disk_consistent_lsn()) + .await .with_context(|| { format!( "Failed to load layermap for timeline {tenant_id}/{timeline_id}", @@ -2578,8 +2587,9 @@ impl Tenant { pgdata_lsn, pg_version, ); - let raw_timeline = - self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?; + let raw_timeline = self + .prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None) + .await?; let tenant_id = raw_timeline.owning_tenant.tenant_id; let unfinished_timeline = raw_timeline.raw_timeline()?; @@ -2632,7 +2642,7 @@ impl Tenant { /// Creates intermediate timeline structure and its files, without loading it into memory. /// It's up to the caller to import the necesary data and import the timeline into memory. - fn prepare_timeline( + async fn prepare_timeline( &self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, @@ -2664,7 +2674,7 @@ impl Tenant { ) { Ok(new_timeline) => { if init_layers { - new_timeline.layers.write().unwrap().next_open_layer_at = + new_timeline.layers.write().await.next_open_layer_at = Some(new_timeline.initdb_lsn); } debug!( @@ -3324,15 +3334,21 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3357,9 +3373,14 @@ mod tests { let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")? .load() .await; - let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let _ = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; - match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) { + match tenant + .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await + { Ok(_) => panic!("duplicate timeline creation should fail"), Err(e) => assert_eq!( e.to_string(), @@ -3388,7 +3409,9 @@ mod tests { use std::str::from_utf8; let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer().await; #[allow(non_snake_case)] @@ -3397,13 +3420,21 @@ mod tests { let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap(); // Insert a value on the timeline - writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?; - writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?; + writer + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .await?; + writer + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .await?; writer.finish_write(Lsn(0x20)); - writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?; + writer + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .await?; writer.finish_write(Lsn(0x30)); - writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?; + writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .await?; writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3416,7 +3447,9 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer().await; - new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; + new_writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .await?; new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches @@ -3444,36 +3477,44 @@ mod tests { { let writer = tline.writer().await; // Create a relation on the timeline - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; } tline.freeze_and_flush().await?; { let writer = tline.writer().await; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); } tline.freeze_and_flush().await @@ -3485,7 +3526,9 @@ mod tests { TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 @@ -3522,8 +3565,9 @@ mod tests { .load() .await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx) + .await?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 match tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) @@ -3572,7 +3616,9 @@ mod tests { TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3620,7 +3666,9 @@ mod tests { TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3643,7 +3691,9 @@ mod tests { TenantHarness::create("test_parent_keeps_data_forever_after_branching")? .load() .await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant @@ -3675,8 +3725,9 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; { let (tenant, ctx) = harness.load().await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x8000)).await?; } @@ -3695,8 +3746,9 @@ mod tests { // create two timelines { let (tenant, ctx) = harness.load().await; - let tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; @@ -3733,7 +3785,9 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; let (tenant, ctx) = harness.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; drop(tline); drop(tenant); @@ -3771,10 +3825,14 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_images")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -3782,7 +3840,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3790,7 +3850,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; + writer + .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -3798,7 +3860,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; + writer + .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -3836,7 +3900,9 @@ mod tests { #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; let mut lsn = Lsn(0x10); @@ -3848,11 +3914,13 @@ mod tests { for _ in 0..10000 { test_key.field6 = blknum; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); @@ -3878,7 +3946,9 @@ mod tests { #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await; - let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 1000; @@ -3896,11 +3966,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -3914,11 +3986,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; @@ -3951,8 +4025,9 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_branches")? .load() .await; - let mut tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let mut tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 1000; @@ -3970,11 +4045,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -3996,11 +4073,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); @@ -4034,8 +4113,9 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")? .load() .await; - let mut tline = - tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let mut tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; const NUM_KEYS: usize = 100; const NUM_TLINES: usize = 50; @@ -4061,11 +4141,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + ) + .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); writer.finish_write(lsn); drop(writer); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c4640307d0..121a4227b9 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1264,7 +1264,12 @@ mod tests { let harness = TenantHarness::create(test_name)?; let (tenant, ctx) = runtime.block_on(harness.load()); // create an empty timeline directory - let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let _ = runtime.block_on(tenant.create_test_timeline( + TIMELINE_ID, + Lsn(0), + DEFAULT_PG_VERSION, + &ctx, + ))?; let remote_fs_dir = harness.conf.workdir.join("remote_fs"); std::fs::create_dir_all(remote_fs_dir)?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 45ed18aebd..24d11d0129 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -119,7 +119,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -238,7 +238,7 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, } -type LayerMapWriteLockGuard<'t> = std::sync::RwLockWriteGuard<'t, LayerMap>; +type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap>; /// Internal structure to hold all data needed for logical size calculation. /// @@ -574,8 +574,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -885,7 +885,7 @@ impl Timeline { /// safekeepers to regard pageserver as caught up and suspend activity. pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; if let Some(open_layer) = &layers.open_layer { let open_layer_size = open_layer.size()?; drop(layers); @@ -981,8 +981,8 @@ impl Timeline { } } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1004,7 +1004,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -1017,7 +1017,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1102,7 +1102,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1346,7 +1346,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1519,8 +1519,8 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1649,7 +1649,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1802,7 +1802,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2154,8 +2154,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2362,7 +2362,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2541,8 +2541,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; self.get_layer_for_write_locked(lsn, &mut layers) } @@ -2593,9 +2593,9 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } @@ -2613,9 +2613,9 @@ impl Timeline { Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2635,7 +2635,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2673,7 +2673,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2765,7 +2765,7 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2774,7 +2774,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -2868,7 +2868,7 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( &self, frozen_layer: &InMemoryLayer, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { @@ -2892,7 +2892,7 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -2944,10 +2944,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; { @@ -3042,7 +3046,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3120,7 +3124,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { @@ -3187,7 +3191,7 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; // Only compact if enough layers have accumulated. @@ -3545,7 +3549,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3805,7 +3809,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4101,7 +4105,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4259,7 +4263,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4361,8 +4365,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4450,8 +4454,8 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } pub fn put_locked( @@ -4465,8 +4469,8 @@ impl<'a> TimelineWriter<'a> { .put_value_locked(key, lsn, value, pre_locked_layer_map) } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 558600692e..a7f24c52ed 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -185,7 +185,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd42..c25eea1b70 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1309,6 +1309,7 @@ mod tests { let (tenant, ctx) = harness.load().await; let timeline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx) + .await .expect("Failed to create an empty timeline for dummy wal connection manager"); ConnectionManagerState {