From 3693d1f431e89eec8231ab3ffa44021f1778d4cf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Jun 2023 18:38:41 +0200 Subject: [PATCH] turn Timeline::layers into tokio::sync::RwLock (#4441) This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). # Full Stack Of Preliminary PRs Thanks to the countless preliminary PRs, this conversion is relatively straight-forward. 1. Clean-ups * https://github.com/neondatabase/neon/pull/4316 * https://github.com/neondatabase/neon/pull/4317 * https://github.com/neondatabase/neon/pull/4318 * https://github.com/neondatabase/neon/pull/4319 * https://github.com/neondatabase/neon/pull/4321 * Note: these were mostly to find an alternative to #4291, which I thought we'd need in my original plan where we would need to convert `Tenant::timelines` into an async locking primitive (#4333). In reviews, we walked away from that, but these cleanups were still quite useful. 2. https://github.com/neondatabase/neon/pull/4364 3. https://github.com/neondatabase/neon/pull/4472 4. https://github.com/neondatabase/neon/pull/4476 5. https://github.com/neondatabase/neon/pull/4477 6. https://github.com/neondatabase/neon/pull/4485 # Significant Changes In This PR ## `compact_level0_phase1` & `create_delta_layer` This commit partially reverts "pgserver: spawn_blocking in compaction (#4265)" 4e359db4c78e0fd3d3e8f6a69baac4fb5b80b752. Specifically, it reverts the `spawn_blocking`-ificiation of `compact_level0_phase1`. If we didn't revert it, we'd have to use `Timeline::layers.blocking_read()` inside `compact_level0_phase1`. That would use up a thread in the `spawn_blocking` thread pool, which is hard-capped. I considered wrapping the code that follows the second `layers.read().await` into `spawn_blocking`, but there are lifetime issues with `deltas_to_compact`. Also, this PR switches the `create_delta_layer` _function_ back to async, and uses `spawn_blocking` inside to run the code that does sync IO, while keeping the code that needs to lock `Timeline::layers` async. ## `LayerIter` and `LayerKeyIter` `Send` bounds I had to add a `Send` bound on the `dyn` type that `LayerIter` and `LayerKeyIter` wrap. Why? Because we now have the second `layers.read().await` inside `compact_level0_phase`, and these iterator instances are held across that await-point. More background: https://github.com/neondatabase/neon/pull/4462#issuecomment-1587376960 ## `DatadirModification::flush` Needed to replace the `HashMap::retain` with a hand-rolled variant because `TimelineWriter::put` is now async. --- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 11 +- pageserver/src/pgdatadir_mapping.rs | 21 +- pageserver/src/tenant.rs | 167 ++++++++++------ pageserver/src/tenant/storage_layer.rs | 4 +- .../tenant/storage_layer/inmemory_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 181 +++++++++--------- .../src/tenant/timeline/eviction_task.rs | 2 +- 8 files changed, 217 insertions(+), 173 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index ce5f81c44b..61cbd5066f 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -516,7 +516,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ac230e5f4a..fc8da70cc0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -215,7 +215,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -233,7 +233,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -264,7 +264,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -330,6 +330,7 @@ async fn timeline_create_handler( Ok(Some(new_timeline)) => { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -591,7 +592,7 @@ async fn tenant_status( // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -701,7 +702,7 @@ async fn layer_map_info_handler( check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 51cac43f50..86c84ec82f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1133,16 +1133,17 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let mut result: anyhow::Result<()> = Ok(()); - self.pending_updates.retain(|&key, value| { - if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, self.lsn, value); - false + let mut retained_pending_updates = HashMap::new(); + for (key, value) in self.pending_updates.drain() { + if is_rel_block_key(key) || is_slru_block_key(key) { + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put(key, self.lsn, &value).await?; } else { - true + retained_pending_updates.insert(key, value); } - }); - result?; + } + self.pending_updates.extend(retained_pending_updates); if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1164,10 +1165,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 32390c06cf..5603bcef84 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -519,6 +519,7 @@ impl Tenant { ); timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; @@ -560,7 +561,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -3582,12 +3583,16 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3656,13 +3661,21 @@ mod tests { let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap(); // Insert a value on the timeline - writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?; - writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?; + writer + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .await?; + writer + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .await?; writer.finish_write(Lsn(0x20)); - writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?; + writer + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .await?; writer.finish_write(Lsn(0x30)); - writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?; + writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .await?; writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3675,7 +3688,9 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer().await; - new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; + new_writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .await?; new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches @@ -3703,36 +3718,44 @@ mod tests { { let writer = tline.writer().await; // Create a relation on the timeline - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; } tline.freeze_and_flush().await?; { let writer = tline.writer().await; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); } tline.freeze_and_flush().await @@ -4048,7 +4071,9 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -4056,7 +4081,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -4064,7 +4091,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; + writer + .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -4072,7 +4101,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; + writer + .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -4124,11 +4155,13 @@ mod tests { for _ in 0..10000 { test_key.field6 = blknum; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); @@ -4174,11 +4207,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4192,11 +4227,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; @@ -4249,11 +4286,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4275,11 +4314,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); @@ -4341,11 +4382,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + ) + .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); writer.finish_write(lsn); drop(writer); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6ac4fd9470..0af3d4ce39 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -389,10 +389,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync { } /// Returned by [`Layer::iter`] -pub type LayerIter<'i> = Box> + 'i>; +pub type LayerIter<'i> = Box> + 'i + Send>; /// Returned by [`Layer::key_iter`] -pub type LayerKeyIter<'i> = Box + 'i>; +pub type LayerKeyIter<'i> = Box + 'i + Send>; /// A Layer contains all data in a "rectangle" consisting of a range of keys and /// range of LSNs. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ec434843d1..d642090996 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -125,7 +125,7 @@ pub struct Timeline { pub pg_version: u32, - pub(super) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -597,8 +597,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -908,7 +908,7 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1038,8 +1038,8 @@ impl Timeline { } } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1061,7 +1061,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -1074,7 +1074,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1159,7 +1159,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1417,7 +1417,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1607,8 +1607,8 @@ impl Timeline { /// /// Scan the timeline directory to populate the layer map. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1737,7 +1737,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1890,7 +1890,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2263,8 +2263,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2481,7 +2481,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2663,8 +2663,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; ensure!(lsn.is_aligned()); @@ -2713,17 +2713,16 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; - + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2742,7 +2741,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2780,7 +2779,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2896,16 +2895,7 @@ impl Timeline { } } // normal case, write out a L0 delta layer file. - let this = self.clone(); - let frozen_layer = frozen_layer.clone(); - let span = tracing::info_span!("blocking"); - let (delta_path, metadata) = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.create_delta_layer(&frozen_layer) - }) - .await - .context("create_delta_layer spawn_blocking") - .and_then(|res| res)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2914,7 +2904,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -3008,34 +2998,50 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( self: &Arc, - frozen_layer: &InMemoryLayer, + frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { - // Write it out - let new_delta = frozen_layer.write_to_disk()?; - let new_delta_path = new_delta.path(); - let new_delta_filename = new_delta.filename(); + let span = tracing::info_span!("blocking"); + let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let _g = span.entered(); + let self_clone = Arc::clone(self); + let frozen_layer = Arc::clone(frozen_layer); + move || { + // Write it out + let new_delta = frozen_layer.write_to_disk()?; + let new_delta_path = new_delta.path(); - // Sync it to disk. - // - // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable - // - // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. + // Sync it to disk. + // + // We must also fsync the timeline dir to ensure the directory entries for + // new layer files are durable + // + // TODO: If we're running inside 'flush_frozen_layers' and there are multiple + // files to flush, it might be better to first write them all, and then fsync + // them all in parallel. - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. - par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) - .context("fsync of timeline dir")?; + // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace + // this with a single fsync in future refactors. + par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + // Then sync the parent directory. + par_fsync::par_fsync(&[self_clone + .conf + .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) + .context("fsync of timeline dir")?; + + let sz = new_delta_path.metadata()?.len(); + + anyhow::Ok((new_delta, sz)) + } + }) + .await + .context("spawn_blocking")??; + let new_delta_name = new_delta.filename(); // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -3046,14 +3052,12 @@ impl Timeline { batch_updates.flush(); // update the timeline's physical size - let sz = new_delta_path.metadata()?.len(); - self.metrics.resident_physical_size_gauge.add(sz); // update metrics self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); - Ok((new_delta_filename, LayerFileMetadata::new(sz))) + Ok((new_delta_name, LayerFileMetadata::new(sz))) } async fn repartition( @@ -3087,10 +3091,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; { @@ -3185,7 +3193,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3268,7 +3276,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3330,13 +3338,13 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - fn compact_level0_phase1( + async fn compact_level0_phase1( &self, _layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); @@ -3459,7 +3467,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3673,21 +3681,12 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let this = self.clone(); - let ctx_inner = ctx.clone(); - let layer_removal_cs_inner = layer_removal_cs.clone(); - let span = tracing::info_span!("blocking"); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) - }) - .await - .context("compact_level0_phase1 spawn_blocking") - .map_err(CompactionError::Other) - .and_then(|res| res)?; + } = self + .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) + .await?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3705,7 +3704,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3964,7 +3963,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4264,7 +4263,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4422,7 +4421,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4524,8 +4523,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4613,12 +4612,12 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 1040dff63d..80c5210211 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,7 +197,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() {