From ce7efbe48a4dbffdd34cb0aeee03dcd367707301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 14 Aug 2023 17:20:37 +0200 Subject: [PATCH] Turn BlockCursor::{read_blob,read_blob_into_buf} async fn (#4905) ## Problem The `BlockCursor::read_blob` and `BlockCursor::read_blob_into_buf` functions are calling `read_blk` internally, so if we want to make that function async fn, they need to be async themselves. ## Summary of changes * We first turn `ValueRef::load` into an async fn. * Then, we switch the `RwLock` implementation in `InMemoryLayer` to use the one from `tokio`. * Last, we convert the `read_blob` and `read_blob_into_buf` functions into async fn. In three instances we use `Handle::block_on`: * one use is in compaction code, which currently isn't async. We put the entire loop into an `async` block to prevent the potentially hot loop from doing cross-thread operations. * one use is in dumping code for `DeltaLayer`. The "proper" way to address this would be to enable the visit function to take async closures, but then we'd need to be generic over async fs non async, which [isn't supported by rust right now](https://blog.rust-lang.org/inside-rust/2022/07/27/keyword-generics.html). The other alternative would be to do a first pass where we cache the data into memory, and only then to dump it. * the third use is in writing code, inside a loop that copies from one file to another. It is is synchronous and we'd like to keep it that way (for now?). Part of #4743 --- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/blob_io.rs | 6 +- pageserver/src/tenant/ephemeral_file.rs | 23 ++- .../src/tenant/storage_layer/delta_layer.rs | 24 ++- .../src/tenant/storage_layer/image_layer.rs | 1 + .../tenant/storage_layer/inmemory_layer.rs | 28 +-- pageserver/src/tenant/timeline.rs | 193 ++++++++++-------- .../src/tenant/timeline/layer_manager.rs | 4 +- 8 files changed, 154 insertions(+), 127 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index e3ddefc661..2d178a42bd 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -72,7 +72,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { .await?; let cursor = BlockCursor::new(&file); for (k, v) in all { - let value = cursor.read_blob(v.pos())?; + let value = cursor.read_blob(v.pos()).await?; println!("key:{} value_len:{}", k, value.len()); } // TODO(chi): special handling for last key? diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 7dd53407e7..fd211e8276 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -21,14 +21,14 @@ where R: BlockReader, { /// Read a blob into a new buffer. - pub fn read_blob(&self, offset: u64) -> Result, std::io::Error> { + pub async fn read_blob(&self, offset: u64) -> Result, std::io::Error> { let mut buf = Vec::new(); - self.read_blob_into_buf(offset, &mut buf)?; + self.read_blob_into_buf(offset, &mut buf).await?; Ok(buf) } /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. - pub fn read_blob_into_buf( + pub async fn read_blob_into_buf( &self, offset: u64, dstbuf: &mut Vec, diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index b088a3b602..38a491f931 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -401,17 +401,26 @@ mod tests { Ok(()) } - #[test] - fn test_ephemeral_blobs() -> Result<(), io::Error> { + #[tokio::test] + async fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; let pos_foo = file.write_blob(b"foo")?; - assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); + assert_eq!( + b"foo", + file.block_cursor().read_blob(pos_foo).await?.as_slice() + ); let pos_bar = file.write_blob(b"bar")?; - assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); - assert_eq!(b"bar", file.block_cursor().read_blob(pos_bar)?.as_slice()); + assert_eq!( + b"foo", + file.block_cursor().read_blob(pos_foo).await?.as_slice() + ); + assert_eq!( + b"bar", + file.block_cursor().read_blob(pos_bar).await?.as_slice() + ); let mut blobs = Vec::new(); for i in 0..10000 { @@ -428,7 +437,7 @@ mod tests { let cursor = BlockCursor::new(&file); for (pos, expected) in blobs { - let actual = cursor.read_blob(pos)?; + let actual = cursor.read_blob(pos).await?; assert_eq!(actual, expected); } @@ -437,7 +446,7 @@ mod tests { large_data.resize(20000, 0); thread_rng().fill_bytes(&mut large_data); let pos_large = file.write_blob(&large_data)?; - let result = file.block_cursor().read_blob(pos_large)?; + let result = file.block_cursor().read_blob(pos_large).await?; assert_eq!(result, large_data); Ok(()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 9a7108c4c8..a0562ddc10 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -51,6 +51,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::Arc; +use tokio::runtime::Handle; use tokio::sync::OnceCell; use tracing::*; @@ -280,7 +281,8 @@ impl Layer for DeltaLayer { // A subroutine to dump a single blob let dump_blob = |blob_ref: BlobRef| -> anyhow::Result { - let buf = cursor.read_blob(blob_ref.pos())?; + // TODO this is not ideal, but on the other hand we are in dumping code... + let buf = Handle::current().block_on(cursor.read_blob(blob_ref.pos()))?; let val = Value::des(&buf)?; let desc = match val { Value::Image(img) => { @@ -335,7 +337,6 @@ impl Layer for DeltaLayer { let inner = self .load(LayerAccessKind::GetValueReconstructData, ctx) .await?; - inner .get_value_reconstruct_data(key, lsn_range, reconstruct_state) .await @@ -912,12 +913,15 @@ impl DeltaLayerInner { let cursor = file.block_cursor(); let mut buf = Vec::new(); for (entry_lsn, pos) in offsets { - cursor.read_blob_into_buf(pos, &mut buf).with_context(|| { - format!( - "Failed to read blob from virtual file {}", - file.file.path.display() - ) - })?; + cursor + .read_blob_into_buf(pos, &mut buf) + .await + .with_context(|| { + format!( + "Failed to read blob from virtual file {}", + file.file.path.display() + ) + })?; let val = Value::des(&buf).with_context(|| { format!( "Failed to deserialize file blob from virtual file {}", @@ -1026,9 +1030,9 @@ pub struct ValueRef> { impl> ValueRef { /// Loads the value from disk - pub fn load(&self) -> Result { + pub async fn load(&self) -> Result { // theoretically we *could* record an access time for each, but it does not really matter - let buf = self.reader.read_blob(self.blob_ref.pos())?; + let buf = self.reader.read_blob(self.blob_ref.pos()).await?; let val = Value::des(&buf)?; Ok(val) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2d5ef402ae..3fe600e5ff 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -470,6 +470,7 @@ impl ImageLayerInner { let blob = file .block_cursor() .read_blob(offset) + .await .with_context(|| format!("failed to read value from offset {}", offset))?; let value = Bytes::from(blob); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5b825eee6c..aa9d0884e0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -28,7 +28,7 @@ use utils::{ // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::ops::Range; -use std::sync::RwLock; +use tokio::sync::RwLock; use super::{DeltaLayer, DeltaLayerWriter, Layer}; @@ -125,7 +125,7 @@ impl Layer for InMemoryLayer { /// debugging function to print out the contents of the layer async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read().await; let end_str = self.end_lsn_or_max(); @@ -143,7 +143,7 @@ impl Layer for InMemoryLayer { for (key, vec_map) in inner.index.iter() { for (lsn, pos) in vec_map.as_slice() { let mut desc = String::new(); - cursor.read_blob_into_buf(*pos, &mut buf)?; + cursor.read_blob_into_buf(*pos, &mut buf).await?; let val = Value::des(&buf); match val { Ok(Value::Image(img)) => { @@ -181,7 +181,7 @@ impl Layer for InMemoryLayer { ensure!(lsn_range.start >= self.start_lsn); let mut need_image = true; - let inner = self.inner.read().unwrap(); + let inner = self.inner.read().await; let reader = inner.file.block_cursor(); @@ -189,7 +189,7 @@ impl Layer for InMemoryLayer { if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); for (entry_lsn, pos) in slice.iter().rev() { - let buf = reader.read_blob(*pos)?; + let buf = reader.read_blob(*pos).await?; let value = Value::des(&buf)?; match value { Value::Image(img) => { @@ -232,8 +232,8 @@ impl InMemoryLayer { /// /// Get layer size on the disk /// - pub fn size(&self) -> Result { - let inner = self.inner.read().unwrap(); + pub async fn size(&self) -> Result { + let inner = self.inner.read().await; Ok(inner.file.size) } @@ -267,9 +267,9 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { + pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write().await; self.assert_writable(); let off = { @@ -301,8 +301,8 @@ impl InMemoryLayer { /// Make the layer non-writeable. Only call once. /// Records the end_lsn for non-dropped layers. /// `end_lsn` is exclusive - pub fn freeze(&self, end_lsn: Lsn) { - let inner = self.inner.write().unwrap(); + pub async fn freeze(&self, end_lsn: Lsn) { + let inner = self.inner.write().await; assert!(self.start_lsn < end_lsn); self.end_lsn.set(end_lsn).expect("end_lsn set only once"); @@ -317,7 +317,7 @@ impl InMemoryLayer { /// Write this frozen in-memory layer to disk. /// /// Returns a new delta layer with all the same data as this in-memory layer - pub fn write_to_disk(&self) -> Result { + pub async fn write_to_disk(&self) -> Result { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception @@ -327,7 +327,7 @@ impl InMemoryLayer { // lock, it will see that it's not writeable anymore and retry, but it // would have to wait until we release it. That race condition is very // rare though, so we just accept the potential latency hit for now. - let inner = self.inner.read().unwrap(); + let inner = self.inner.read().await; let end_lsn = *self.end_lsn.get().unwrap(); @@ -350,7 +350,7 @@ impl InMemoryLayer { let key = **key; // Write all page versions for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf)?; + cursor.read_blob_into_buf(*pos, &mut buf).await?; let will_init = Value::des(&buf)?.will_init(); delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 95e86c6bcc..2deeacdc64 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -882,7 +882,7 @@ impl Timeline { let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; - open_layer.size()? + open_layer.size().await? }; let last_freeze_at = self.last_freeze_at.load(); let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); @@ -2654,7 +2654,7 @@ impl Timeline { async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); let layer = self.get_layer_for_write(lsn).await?; - layer.put_value(key, lsn, val)?; + layer.put_value(key, lsn, val).await?; Ok(()) } @@ -2680,7 +2680,9 @@ impl Timeline { Some(self.write_lock.lock().await) }; let mut guard = self.layers.write().await; - guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at); + guard + .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at) + .await; } /// Layer flusher task's main loop. @@ -2962,7 +2964,11 @@ impl Timeline { let frozen_layer = Arc::clone(frozen_layer); move || { // Write it out - let new_delta = frozen_layer.write_to_disk()?; + // Keep this inside `spawn_blocking` and `Handle::current` + // as long as the write path is still sync and the read impl + // is still not fully async. Otherwise executor threads would + // be blocked. + let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?; let new_delta_path = new_delta.path(); // Sync it to disk. @@ -3653,98 +3659,105 @@ impl Timeline { let mut key_values_total_size = 0u64; let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - for (key, lsn, value_ref) in all_values_iter { - let value = value_ref.load()?; - let same_key = prev_key.map_or(false, |prev_key| prev_key == key); - // We need to check key boundaries once we reach next key or end of layer with the same key - if !same_key || lsn == dup_end_lsn { - let mut next_key_size = 0u64; - let is_dup_layer = dup_end_lsn.is_valid(); - dup_start_lsn = Lsn::INVALID; - if !same_key { - dup_end_lsn = Lsn::INVALID; + + // TODO remove this block_on wrapper once we fully go async + Handle::current().block_on(async { + for (key, lsn, value_ref) in all_values_iter { + let value = value_ref.load().await?; + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key || lsn == dup_end_lsn { + let mut next_key_size = 0u64; + let is_dup_layer = dup_end_lsn.is_valid(); + dup_start_lsn = Lsn::INVALID; + if !same_key { + dup_end_lsn = Lsn::INVALID; + } + // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size + for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { + next_key_size = next_size; + if key != next_key { + if dup_end_lsn.is_valid() { + // We are writting segment with duplicates: + // place all remaining values of this key in separate segment + dup_start_lsn = dup_end_lsn; // new segments starts where old stops + dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range + } + break; + } + key_values_total_size += next_size; + // Check if it is time to split segment: if total keys size is larger than target file size. + // We need to avoid generation of empty segments if next_size > target_file_size. + if key_values_total_size > target_file_size && lsn != next_lsn { + // Split key between multiple layers: such layer can contain only single key + dup_start_lsn = if dup_end_lsn.is_valid() { + dup_end_lsn // new segment with duplicates starts where old one stops + } else { + lsn // start with the first LSN for this key + }; + dup_end_lsn = next_lsn; // upper LSN boundary is exclusive + break; + } + } + // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. + if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + let contains_hole = + next_hole < holes.len() && key >= holes[next_hole].key_range.end; + // check if key cause layer overflow or contains hole... + if is_dup_layer + || dup_end_lsn.is_valid() + || written_size + key_values_total_size > target_file_size + || contains_hole + { + // ... if so, flush previous layer and prepare to write new one + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); + writer = None; + + if contains_hole { + // skip hole + next_hole += 1; + } + } + } + // Remember size of key value because at next iteration we will access next item + key_values_total_size = next_key_size; } - // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size - for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { - next_key_size = next_size; - if key != next_key { + if writer.is_none() { + // Create writer if not initiaized yet + writer = Some(DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + key, if dup_end_lsn.is_valid() { - // We are writting segment with duplicates: - // place all remaining values of this key in separate segment - dup_start_lsn = dup_end_lsn; // new segments starts where old stops - dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range - } - break; - } - key_values_total_size += next_size; - // Check if it is time to split segment: if total keys size is larger than target file size. - // We need to avoid generation of empty segments if next_size > target_file_size. - if key_values_total_size > target_file_size && lsn != next_lsn { - // Split key between multiple layers: such layer can contain only single key - dup_start_lsn = if dup_end_lsn.is_valid() { - dup_end_lsn // new segment with duplicates starts where old one stops + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn } else { - lsn // start with the first LSN for this key - }; - dup_end_lsn = next_lsn; // upper LSN boundary is exclusive - break; - } + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, + )?); } - // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. - if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { - dup_start_lsn = dup_end_lsn; - dup_end_lsn = lsn_range.end; - } - if writer.is_some() { - let written_size = writer.as_mut().unwrap().size(); - let contains_hole = - next_hole < holes.len() && key >= holes[next_hole].key_range.end; - // check if key cause layer overflow or contains hole... - if is_dup_layer - || dup_end_lsn.is_valid() - || written_size + key_values_total_size > target_file_size - || contains_hole - { - // ... if so, flush previous layer and prepare to write new one - new_layers.push(Arc::new( - writer.take().unwrap().finish(prev_key.unwrap().next())?, - )); - writer = None; - if contains_hole { - // skip hole - next_hole += 1; - } - } - } - // Remember size of key value because at next iteration we will access next item - key_values_total_size = next_key_size; + fail_point!("delta-layer-writer-fail-before-finish", |_| { + Result::<_>::Err(anyhow::anyhow!( + "failpoint delta-layer-writer-fail-before-finish" + )) + }); + + writer.as_mut().unwrap().put_value(key, lsn, value)?; + prev_key = Some(key); } - if writer.is_none() { - // Create writer if not initiaized yet - writer = Some(DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - key, - if dup_end_lsn.is_valid() { - // this is a layer containing slice of values of the same key - debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); - dup_start_lsn..dup_end_lsn - } else { - debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); - lsn_range.clone() - }, - )?); - } - - fail_point!("delta-layer-writer-fail-before-finish", |_| { - Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into()) - }); - - writer.as_mut().unwrap().put_value(key, lsn, value)?; - prev_key = Some(key); - } + Ok(()) + })?; if let Some(writer) = writer { new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 824d869bec..40212265cf 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -163,7 +163,7 @@ impl LayerManager { } /// Called from `freeze_inmem_layer`, returns true if successfully frozen. - pub fn try_freeze_in_memory_layer( + pub async fn try_freeze_in_memory_layer( &mut self, Lsn(last_record_lsn): Lsn, last_freeze_at: &AtomicLsn, @@ -173,7 +173,7 @@ impl LayerManager { if let Some(open_layer) = &self.layer_map.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? - open_layer.freeze(end_lsn); + open_layer.freeze(end_lsn).await; // The layer is no longer open, update the layer map to reflect this. // We will replace it with on-disk historics below.