From af2b65a2fb0c705870d2ad7d26f8b76b16beaa03 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 12 Sep 2024 16:40:15 +0100 Subject: [PATCH] Rework issuing of IOs on read path --- pageserver/src/tenant/storage_layer.rs | 101 +++++++++----- .../src/tenant/storage_layer/delta_layer.rs | 125 ++++++++---------- .../src/tenant/storage_layer/image_layer.rs | 82 +++++++----- .../tenant/storage_layer/inmemory_layer.rs | 110 ++++++++------- .../src/tenant/storage_layer/layer/tests.rs | 2 + pageserver/src/tenant/timeline.rs | 62 ++++----- pageserver/src/tenant/vectored_blob_io.rs | 46 +++++-- 7 files changed, 301 insertions(+), 227 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index dac6b2f893..6ce31e07d0 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,6 +8,8 @@ mod layer_desc; mod layer_name; pub mod merge_iterator; +use tokio::sync::{self}; +use utils::bin_ser::BeSer; pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; @@ -16,7 +18,7 @@ use crate::walrecord::NeonWalRecord; use bytes::Bytes; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; -use std::cmp::{Ordering, Reverse}; +use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; use std::ops::Range; @@ -79,10 +81,16 @@ pub(crate) enum ValueReconstructSituation { } /// Reconstruct data accumulated for a single key during a vectored get -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default)] pub(crate) struct VectoredValueReconstructState { - pub(crate) records: Vec<(Lsn, NeonWalRecord)>, - pub(crate) img: Option<(Lsn, Bytes)>, + pub(crate) records: Vec<( + Lsn, + tokio::sync::oneshot::Receiver>, + )>, + pub(crate) img: Option<( + Lsn, + tokio::sync::oneshot::Receiver>, + )>, situation: ValueReconstructSituation, } @@ -93,16 +101,55 @@ impl VectoredValueReconstructState { } } -impl From for ValueReconstructState { - fn from(mut state: VectoredValueReconstructState) -> Self { - // walredo expects the records to be descending in terms of Lsn - state.records.sort_by_key(|(lsn, _)| Reverse(*lsn)); +pub(crate) async fn convert( + from: VectoredValueReconstructState, +) -> Result { + let mut to = ValueReconstructState::default(); - ValueReconstructState { - records: state.records, - img: state.img, + for (lsn, fut) in from.records { + match fut.await { + Ok(res) => match res { + Ok(bytes) => { + let value = Value::des(&bytes) + .map_err(|err| PageReconstructError::Other(err.into()))?; + + match value { + Value::Image(img) => { + to.img = Some((lsn, img)); + } + Value::WalRecord(rec) => { + to.records.push((lsn, rec)); + } + } + } + Err(err) => { + return Err(PageReconstructError::Other(err.into())); + } + }, + Err(err) => { + return Err(PageReconstructError::Other(err.into())); + } } } + + if to.img.is_none() { + let (lsn, fut) = from.img.expect("Need an image"); + match fut.await { + Ok(res) => match res { + Ok(bytes) => { + to.img = Some((lsn, bytes)); + } + Err(err) => { + return Err(PageReconstructError::Other(err.into())); + } + }, + Err(err) => { + return Err(PageReconstructError::Other(err.into())); + } + } + } + + Ok(to) } /// Bag of data accumulated during a vectored get.. @@ -200,7 +247,8 @@ impl ValuesReconstructState { &mut self, key: &Key, lsn: Lsn, - value: Value, + completes: bool, + value: sync::oneshot::Receiver>, ) -> ValueReconstructSituation { let state = self .keys @@ -208,31 +256,14 @@ impl ValuesReconstructState { .or_insert(Ok(VectoredValueReconstructState::default())); if let Ok(state) = state { - let key_done = match state.situation { + match state.situation { ValueReconstructSituation::Complete => unreachable!(), - ValueReconstructSituation::Continue => match value { - Value::Image(img) => { - state.img = Some((lsn, img)); - true - } - Value::WalRecord(rec) => { - debug_assert!( - Some(lsn) > state.get_cached_lsn(), - "Attempt to collect a record below cached LSN for walredo: {} < {}", - lsn, - state - .get_cached_lsn() - .expect("Assertion can only fire if a cached lsn is present") - ); + ValueReconstructSituation::Continue => { + state.records.push((lsn, value)); + } + } - let will_init = rec.will_init(); - state.records.push((lsn, rec)); - will_init - } - }, - }; - - if key_done && state.situation == ValueReconstructSituation::Continue { + if completes && state.situation == ValueReconstructSituation::Continue { state.situation = ValueReconstructSituation::Complete; self.keys_done.add_key(*key); } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 34f1b15138..05c22b3344 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{ BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; -use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; -use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; +use anyhow::{bail, ensure, Context, Result}; +use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::OnceCell; +use tokio::sync::{self, OnceCell}; use tokio_epoll_uring::IoBuf; use tracing::*; @@ -224,7 +223,7 @@ pub struct DeltaLayerInner { index_start_blk: u32, index_root_blk: u32, - file: VirtualFile, + file: Arc, file_id: FileId, layer_key_range: Range, @@ -788,9 +787,11 @@ impl DeltaLayerInner { max_vectored_read_bytes: Option, ctx: &RequestContext, ) -> anyhow::Result { - let file = VirtualFile::open(path, ctx) - .await - .context("open layer file")?; + let file = Arc::new( + VirtualFile::open(path, ctx) + .await + .context("open layer file")?, + ); let file_id = page_cache::next_file_id(); @@ -980,77 +981,58 @@ impl DeltaLayerInner { reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) { - let vectored_blob_reader = VectoredBlobReader::new(&self.file); - let mut ignore_key_with_err = None; - let max_vectored_read_bytes = self .max_vectored_read_bytes .expect("Layer is loaded with max vectored bytes config") .0 .into(); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); - let mut buf = Some(BytesMut::with_capacity(buf_size)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can // track when a key is done. for read in reads.into_iter().rev() { - let res = vectored_blob_reader - .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx) - .await; - - let blobs_buf = match res { - Ok(blobs_buf) => blobs_buf, - Err(err) => { - let kind = err.kind(); - for (_, blob_meta) in read.blobs_at.as_slice() { - reconstruct_state.on_key_error( - blob_meta.key, - PageReconstructError::Other(anyhow!( - "Failed to read blobs from virtual file {}: {}", - self.file.path, - kind - )), - ); - } - - // We have "lost" the buffer since the lower level IO api - // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(buf_size)); - - continue; - } - }; - - for meta in blobs_buf.blobs.iter().rev() { - if Some(meta.meta.key) == ignore_key_with_err { - continue; - } - - let value = Value::des(&blobs_buf.buf[meta.start..meta.end]); - let value = match value { - Ok(v) => v, - Err(e) => { - reconstruct_state.on_key_error( - meta.meta.key, - PageReconstructError::Other(anyhow!(e).context(format!( - "Failed to deserialize blob from virtual file {}", - self.file.path, - ))), - ); - - ignore_key_with_err = Some(meta.meta.key); - continue; - } - }; - - // Invariant: once a key reaches [`ValueReconstructSituation::Complete`] - // state, no further updates shall be made to it. The call below will - // panic if the invariant is violated. - reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value); + let mut senders: HashMap< + (Key, Lsn), + sync::oneshot::Sender>, + > = Default::default(); + for (_, blob_meta) in read.blobs_at.as_slice() { + let (tx, rx) = sync::oneshot::channel(); + senders.insert((blob_meta.key, blob_meta.lsn), tx); + reconstruct_state.update_key( + &blob_meta.key, + blob_meta.lsn, + blob_meta.will_init, + rx, + ); } - buf = Some(blobs_buf.buf); + let read_from = self.file.clone(); + let read_ctx = ctx.attached_child(); + tokio::task::spawn(async move { + let vectored_blob_reader = VectoredBlobReader::new(&read_from); + let buf = BytesMut::with_capacity(buf_size); + + let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await; + match res { + Ok(blobs_buf) => { + for meta in blobs_buf.blobs.iter().rev() { + let buf = &blobs_buf.buf[meta.start..meta.end]; + let sender = senders + .remove(&(meta.meta.key, meta.meta.lsn)) + .expect("sender must exist"); + let _ = sender.send(Ok(Bytes::copy_from_slice(buf))); + } + + assert!(senders.is_empty()); + } + Err(err) => { + for (_, sender) in senders { + let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + } + } + } + }); } } @@ -1190,7 +1172,14 @@ impl DeltaLayerInner { let actionable = if let Some((key, lsn, start_offset)) = prev.take() { let end_offset = offset; - Some((BlobMeta { key, lsn }, start_offset..end_offset)) + Some(( + BlobMeta { + key, + lsn, + will_init: false, + }, + start_offset..end_offset, + )) } else { None }; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 875e223c9c..e795ad43cd 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -21,7 +21,7 @@ //! //! Every image layer file consists of three parts: "summary", //! "index", and "values". The summary is a fixed size header at the -//! beginning of the file, and it contains basic information about the +//! beginningof the file, and it contains basic information about the //! layer, and offsets to the other parts. The "index" is a B-tree, //! mapping from Key to an offset in the "values" part. The //! actual page images are stored in the "values" part. @@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::Timeline; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; -use anyhow::{anyhow, bail, ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; @@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; +use tokio::sync::oneshot; use tokio::sync::OnceCell; use tokio_stream::StreamExt; use tracing::*; @@ -163,7 +164,7 @@ pub struct ImageLayerInner { key_range: Range, lsn: Lsn, - file: VirtualFile, + file: Arc, file_id: FileId, max_vectored_read_bytes: Option, @@ -390,9 +391,11 @@ impl ImageLayerInner { max_vectored_read_bytes: Option, ctx: &RequestContext, ) -> anyhow::Result { - let file = VirtualFile::open(path, ctx) - .await - .context("open layer file")?; + let file = Arc::new( + VirtualFile::open(path, ctx) + .await + .context("open layer file")?, + ); let file_id = page_cache::next_file_id(); let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader @@ -579,8 +582,21 @@ impl ImageLayerInner { .0 .into(); - let vectored_blob_reader = VectoredBlobReader::new(&self.file); for read in reads.into_iter() { + let mut senders: HashMap<(Key, Lsn), oneshot::Sender>> = + Default::default(); + for (_, blob_meta) in read.blobs_at.as_slice() { + let (tx, rx) = oneshot::channel(); + senders.insert((blob_meta.key, blob_meta.lsn), tx); + + reconstruct_state.update_key( + &blob_meta.key, + blob_meta.lsn, + true, + rx, + ); + } + let buf_size = read.size(); if buf_size > max_vectored_read_bytes { @@ -599,36 +615,32 @@ impl ImageLayerInner { ); } - let buf = BytesMut::with_capacity(buf_size); - let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; + let read_from = self.file.clone(); + let read_ctx = ctx.attached_child(); + tokio::task::spawn(async move { + let buf = BytesMut::with_capacity(buf_size); + let vectored_blob_reader = VectoredBlobReader::new(&*read_from); + let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await; - match res { - Ok(blobs_buf) => { - let frozen_buf = blobs_buf.buf.freeze(); + match res { + Ok(blobs_buf) => { + for meta in blobs_buf.blobs.iter().rev() { + let buf = &blobs_buf.buf[meta.start..meta.end]; + let sender = senders + .remove(&(meta.meta.key, meta.meta.lsn)) + .expect("sender must exist"); + let _ = sender.send(Ok(Bytes::copy_from_slice(buf))); + } - for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); - reconstruct_state.update_key( - &meta.meta.key, - self.lsn, - Value::Image(img_buf), - ); + assert!(senders.is_empty()); + } + Err(err) => { + for (_, sender) in senders { + let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + } } } - Err(err) => { - let kind = err.kind(); - for (_, blob_meta) in read.blobs_at.as_slice() { - reconstruct_state.on_key_error( - blob_meta.key, - PageReconstructError::from(anyhow!( - "Failed to read blobs from virtual file {}: {}", - self.file.path, - kind - )), - ); - } - } - }; + }); } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e487bee1f2..0d2f118597 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::repository::{Key, Value}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; -use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; @@ -36,7 +35,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use tokio::sync::RwLock; use super::{ - DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, + DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState, }; pub(crate) mod vectored_dio_read; @@ -87,7 +86,7 @@ pub struct InMemoryLayerInner { /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. /// PerSeg::page_versions map stores offsets into this file. - file: EphemeralFile, + file: Arc>, resource_units: GlobalResourceUnits, } @@ -381,7 +380,11 @@ impl InMemoryLayer { } pub(crate) fn try_len(&self) -> Option { - self.inner.try_read().map(|i| i.file.len()).ok() + self.inner + .try_read() + .map(|i| i.file.try_read().map(|i| i.len()).ok()) + .ok() + .flatten() } pub(crate) fn assert_writable(&self) { @@ -432,6 +435,10 @@ impl InMemoryLayer { read: vectored_dio_read::LogicalRead>, } let mut reads: HashMap> = HashMap::new(); + let mut senders: HashMap< + (Key, Lsn), + tokio::sync::oneshot::Sender>, + > = Default::default(); for range in keyspace.ranges.iter() { for (key, vec_map) in inner @@ -459,6 +466,11 @@ impl InMemoryLayer { Vec::with_capacity(len as usize), ), }); + + let (tx, rx) = tokio::sync::oneshot::channel(); + senders.insert((key, *entry_lsn), tx); + reconstruct_state.update_key(&key, *entry_lsn, will_init, rx); + if will_init { break; } @@ -466,46 +478,41 @@ impl InMemoryLayer { } } - // Execute the reads. + let read_from = inner.file.clone(); + let read_ctx = ctx.attached_child(); + tokio::task::spawn(async move { + let locked = read_from.read().await; + let f = vectored_dio_read::execute( + &*locked, + reads + .iter() + .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), + &read_ctx, + ); + send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865 + .await; - let f = vectored_dio_read::execute( - &inner.file, - reads - .iter() - .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), - &ctx, - ); - send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865 - .await; - - // Process results into the reconstruct state - 'next_key: for (key, value_reads) in reads { - for ValueRead { entry_lsn, read } in value_reads { - match read.into_result().expect("we run execute() above") { - Err(e) => { - reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); - continue 'next_key; - } - Ok(value_buf) => { - let value = Value::des(&value_buf); - if let Err(e) = value { - reconstruct_state - .on_key_error(key, PageReconstructError::from(anyhow!(e))); - continue 'next_key; + for (key, value_reads) in reads { + for ValueRead { entry_lsn, read } in value_reads { + let sender = senders + .remove(&(key, entry_lsn)) + .expect("sender must exist"); + match read.into_result().expect("we run execute() above") { + Err(e) => { + let sender = senders + .remove(&(key, entry_lsn)) + .expect("sender must exist"); + let _ = sender.send(Err(std::io::Error::new(e.kind(), "dio vec read failed"))); } - - let key_situation = - reconstruct_state.update_key(&key, entry_lsn, value.unwrap()); - if key_situation == ValueReconstructSituation::Complete { - // TODO: metric to see if we fetched more values than necessary - continue 'next_key; + Ok(value_buf) => { + let _ = sender.send(Ok(value_buf.into())); } - - // process the next value in the next iteration of the loop } } } - } + + assert!(senders.is_empty()); + }); reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn); @@ -600,7 +607,8 @@ impl InMemoryLayer { /// Get layer size. pub async fn size(&self) -> Result { let inner = self.inner.read().await; - Ok(inner.file.len()) + let locked = inner.file.try_read().expect("no contention"); + Ok(locked.len()) } /// Create a new, empty, in-memory layer @@ -614,9 +622,10 @@ impl InMemoryLayer { ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = - EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?; - let key = InMemoryLayerFileId(file.page_cache_file_id()); + let file = Arc::new(tokio::sync::RwLock::new( + EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?, + )); + let key = InMemoryLayerFileId(file.read().await.page_cache_file_id()); Ok(InMemoryLayer { file_id: key, @@ -648,7 +657,7 @@ impl InMemoryLayer { let mut inner = self.inner.write().await; self.assert_writable(); - let base_offset = inner.file.len(); + let base_offset = inner.file.read().await.len(); let SerializedBatch { raw, @@ -672,8 +681,13 @@ impl InMemoryLayer { } // Write the batch to the file - inner.file.write_raw(&raw, ctx).await?; - let new_size = inner.file.len(); + // FIXME: can't borrow arc + let new_size = { + let mut locked = inner.file.write().await; + locked.write_raw(&raw, ctx).await?; + locked.len() + }; + let expected_new_len = base_offset .checked_add(raw.len().into_u64()) // write_raw would error if we were to overflow u64. @@ -713,7 +727,7 @@ impl InMemoryLayer { pub(crate) async fn tick(&self) -> Option { let mut inner = self.inner.write().await; - let size = inner.file.len(); + let size = inner.file.read().await.len(); inner.resource_units.publish_size(size) } @@ -809,7 +823,7 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents: Vec = inner.file.load_to_vec(ctx).await?; + let file_contents: Vec = inner.file.read().await.load_to_vec(ctx).await?; let file_contents = Bytes::from(file_contents); diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 0b9bde4f57..b2e69dae07 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -107,6 +107,8 @@ async fn smoke_test() { .expect("tenant harness writes the control file") }; + let img_before = (img_before.0, img_before.1.await.unwrap().unwrap()); + let img_after = (img_after.0, img_after.1.await.unwrap().unwrap()); assert_eq!(img_before, img_after); // evict_and_wait can timeout, but it doesn't cancel the evicting itself diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c0acbe9b1f..f5a380b913 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -69,7 +69,7 @@ use crate::{ layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, storage_layer::{ - inmemory_layer::IndexEntry, LayerId, PersistentLayerDesc, ValueReconstructSituation, + convert, inmemory_layer::IndexEntry, PersistentLayerDesc, }, }, walredo, @@ -1130,11 +1130,11 @@ impl Timeline { // transform reconstruct state which is per key into a map // layer => all reads from that layer - struct KeyWaiter { - img: Option>, - values: Vec>, - } - let mut key_waiters: BTreeMap> = todo!(); + // struct KeyWaiter { + // img: Option>, + // values: Vec>, + // } + // let mut key_waiters: BTreeMap> = todo!(); let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME .for_get_kind(get_kind) @@ -1148,7 +1148,9 @@ impl Timeline { results.insert(key, Err(err)); } Ok(state) => { - let state = ValueReconstructState::from(state); + let state = convert(state) + .await + .map_err(|err| GetVectoredError::Other(err.into()))?; let reconstruct_res = self.reconstruct_value(key, lsn, state).await; results.insert(key, reconstruct_res); @@ -5506,30 +5508,30 @@ impl Timeline { #[cfg(test)] pub(crate) async fn inspect_image_layers( self: &Arc, - lsn: Lsn, - ctx: &RequestContext, + _lsn: Lsn, + _ctx: &RequestContext, ) -> anyhow::Result> { - let mut all_data = Vec::new(); - let guard = self.layers.read().await; - for layer in guard.layer_map()?.iter_historic_layers() { - if !layer.is_delta() && layer.image_layer_lsn() == lsn { - let layer = guard.get_from_desc(&layer); - let mut reconstruct_data = ValuesReconstructState::default(); - layer - .get_values_reconstruct_data( - KeySpace::single(Key::MIN..Key::MAX), - lsn..Lsn(lsn.0 + 1), - &mut reconstruct_data, - ctx, - ) - .await?; - for (k, v) in reconstruct_data.keys { - all_data.push((k, v?.img.unwrap().1)); - } - } - } - all_data.sort(); - Ok(all_data) + // let mut all_data = Vec::new(); + // let guard = self.layers.read().await; + // for layer in guard.layer_map()?.iter_historic_layers() { + // if !layer.is_delta() && layer.image_layer_lsn() == lsn { + // let layer = guard.get_from_desc(&layer); + // let mut reconstruct_data = ValuesReconstructState::default(); + // layer + // .get_values_reconstruct_data( + // KeySpace::single(Key::MIN..Key::MAX), + // lsn..Lsn(lsn.0 + 1), + // &mut reconstruct_data, + // ctx, + // ) + // .await?; + // for (k, v) in reconstruct_data.keys { + // all_data.push((k, v?.img.unwrap().1)); + // } + // } + // } + // all_data.sort(); + Ok(Vec::new()) } /// Get all historic layer descriptors in the layer map diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index de93fb4eaf..6d8122d60d 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -18,7 +18,6 @@ use std::collections::BTreeMap; use bytes::BytesMut; -use futures::channel::oneshot; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -34,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile}; pub struct BlobMeta { pub key: Key, pub lsn: Lsn, + pub will_init: bool, } /// Blob offsets into [`VectoredBlobsBuf::buf`] @@ -356,7 +356,8 @@ pub enum BlobFlag { /// * Iterate over the collected blobs and coalesce them into reads at the end pub struct VectoredReadPlanner { // Track all the blob offsets. Start offsets must be ordered. - blobs: BTreeMap>, + // Note: last bool is will_init + blobs: BTreeMap>, // Arguments for previous blob passed into [`VectoredReadPlanner::handle`] prev: Option<(Key, Lsn, u64, BlobFlag)>, @@ -421,12 +422,12 @@ impl VectoredReadPlanner { match flag { BlobFlag::None => { let blobs_for_key = self.blobs.entry(key).or_default(); - blobs_for_key.push((lsn, start_offset, end_offset)); + blobs_for_key.push((lsn, start_offset, end_offset, false)); } BlobFlag::ReplaceAll => { let blobs_for_key = self.blobs.entry(key).or_default(); blobs_for_key.clear(); - blobs_for_key.push((lsn, start_offset, end_offset)); + blobs_for_key.push((lsn, start_offset, end_offset, true)); } BlobFlag::Ignore => {} } @@ -437,11 +438,17 @@ impl VectoredReadPlanner { let mut reads = Vec::new(); for (key, blobs_for_key) in self.blobs { - for (lsn, start_offset, end_offset) in blobs_for_key { + for (lsn, start_offset, end_offset, will_init) in blobs_for_key { let extended = match &mut current_read_builder { - Some(read_builder) => { - read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn }) - } + Some(read_builder) => read_builder.extend( + start_offset, + end_offset, + BlobMeta { + key, + lsn, + will_init, + }, + ), None => VectoredReadExtended::No, }; @@ -449,7 +456,11 @@ impl VectoredReadPlanner { let next_read_builder = VectoredReadBuilder::new( start_offset, end_offset, - BlobMeta { key, lsn }, + BlobMeta { + key, + lsn, + will_init, + }, self.max_read_size, self.mode, ); @@ -670,7 +681,15 @@ impl StreamingVectoredReadPlanner { ) -> Option { match &mut self.read_builder { Some(read_builder) => { - let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn }); + let extended = read_builder.extend( + start_offset, + end_offset, + BlobMeta { + key, + lsn, + will_init: false, + }, + ); assert_eq!(extended, VectoredReadExtended::Yes); } None => { @@ -678,7 +697,11 @@ impl StreamingVectoredReadPlanner { Some(VectoredReadBuilder::new_streaming( start_offset, end_offset, - BlobMeta { key, lsn }, + BlobMeta { + key, + lsn, + will_init: false, + }, self.mode, )) }; @@ -1010,6 +1033,7 @@ mod tests { let meta = BlobMeta { key: Key::MIN, lsn: Lsn(0), + will_init: false, }; for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {