diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 606ba9ad8c..8a2e2ed3be 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,9 @@ testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", " fuzz-read-path = ["testing"] +# Enables benchmarking only APIs +benchmarking = [] + [dependencies] anyhow.workspace = true arc-swap.workspace = true @@ -127,6 +130,7 @@ harness = false [[bench]] name = "bench_ingest" harness = false +required-features = ["benchmarking"] [[bench]] name = "upload_queue" diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 681d135e09..438c6e235e 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -1,22 +1,29 @@ use std::env; use std::num::NonZeroUsize; +use std::sync::Arc; use bytes::Bytes; use camino::Utf8PathBuf; use criterion::{Criterion, criterion_group, criterion_main}; +use futures::stream::FuturesUnordered; use pageserver::config::PageServerConf; use pageserver::context::{DownloadBehavior, RequestContext}; +use pageserver::keyspace::KeySpace; use pageserver::l0_flush::{L0FlushConfig, L0FlushGlobalState}; use pageserver::task_mgr::TaskKind; -use pageserver::tenant::storage_layer::InMemoryLayer; +use pageserver::tenant::storage_layer::IoConcurrency; +use pageserver::tenant::storage_layer::{InMemoryLayer, ValuesReconstructState}; use pageserver::{page_cache, virtual_file}; +use pageserver_api::config::GetVectoredConcurrentIo; use pageserver_api::key::Key; use pageserver_api::models::virtual_file::IoMode; use pageserver_api::shard::TenantShardId; -use strum::IntoEnumIterator; +use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; +use utils::sync::gate::Gate; use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::SerializedValueBatch; @@ -30,7 +37,7 @@ fn murmurhash32(mut h: u32) -> u32 { h } -#[derive(serde::Serialize, Clone, Copy, Debug)] +#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)] enum KeyLayout { /// Sequential unique keys Sequential, @@ -40,19 +47,30 @@ enum KeyLayout { RandomReuse(u32), } -#[derive(serde::Serialize, Clone, Copy, Debug)] +#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)] enum WriteDelta { Yes, No, } +#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)] +enum ConcurrentReads { + Yes, + No, +} + async fn ingest( conf: &'static PageServerConf, put_size: usize, put_count: usize, key_layout: KeyLayout, write_delta: WriteDelta, + concurrent_reads: ConcurrentReads, ) -> anyhow::Result<()> { + if concurrent_reads == ConcurrentReads::Yes { + assert_eq!(key_layout, KeyLayout::Sequential); + } + let mut lsn = utils::lsn::Lsn(1000); let mut key = Key::from_i128(0x0); @@ -68,16 +86,18 @@ async fn ingest( let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); - let layer = InMemoryLayer::create( - conf, - timeline_id, - tenant_shard_id, - lsn, - &gate, - &cancel, - &ctx, - ) - .await?; + let layer = Arc::new( + InMemoryLayer::create( + conf, + timeline_id, + tenant_shard_id, + lsn, + &gate, + &cancel, + &ctx, + ) + .await?, + ); let data = Value::Image(Bytes::from(vec![0u8; put_size])); let data_ser_size = data.serialized_size().unwrap() as usize; @@ -86,6 +106,61 @@ async fn ingest( pageserver::context::DownloadBehavior::Download, ); + const READ_BATCH_SIZE: u32 = 32; + let (tx, mut rx) = tokio::sync::watch::channel::>(None); + let reader_cancel = CancellationToken::new(); + let reader_handle = if concurrent_reads == ConcurrentReads::Yes { + Some(tokio::task::spawn({ + let cancel = reader_cancel.clone(); + let layer = layer.clone(); + let ctx = ctx.attached_child(); + async move { + let gate = Gate::default(); + let gate_guard = gate.enter().unwrap(); + let io_concurrency = IoConcurrency::spawn_from_conf( + GetVectoredConcurrentIo::SidecarTask, + gate_guard, + ); + + rx.wait_for(|key| key.is_some()).await.unwrap(); + + while !cancel.is_cancelled() { + let key = match *rx.borrow() { + Some(some) => some, + None => unreachable!(), + }; + + let mut start_key = key; + start_key.field6 = key.field6.saturating_sub(READ_BATCH_SIZE); + let key_range = start_key..key.next(); + + let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone()); + + layer + .get_values_reconstruct_data( + KeySpace::single(key_range), + Lsn(1)..Lsn(u64::MAX), + &mut reconstruct_state, + &ctx, + ) + .await + .unwrap(); + + let mut collect_futs = std::mem::take(&mut reconstruct_state.keys) + .into_values() + .map(|state| state.sink_pending_ios()) + .collect::>(); + while collect_futs.next().await.is_some() {} + } + + drop(io_concurrency); + gate.close().await; + } + })) + } else { + None + }; + const BATCH_SIZE: usize = 16; let mut batch = Vec::new(); @@ -113,19 +188,27 @@ async fn ingest( batch.push((key.to_compact(), lsn, data_ser_size, data.clone())); if batch.len() >= BATCH_SIZE { + let last_key = Key::from_compact(batch.last().unwrap().0); + let this_batch = std::mem::take(&mut batch); let serialized = SerializedValueBatch::from_values(this_batch); layer.put_batch(serialized, &ctx).await?; + + tx.send(Some(last_key)).unwrap(); } } if !batch.is_empty() { + let last_key = Key::from_compact(batch.last().unwrap().0); + let this_batch = std::mem::take(&mut batch); let serialized = SerializedValueBatch::from_values(this_batch); layer.put_batch(serialized, &ctx).await?; + + tx.send(Some(last_key)).unwrap(); } layer.freeze(lsn + 1).await; - if matches!(write_delta, WriteDelta::Yes) { + if write_delta == WriteDelta::Yes { let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct { max_concurrency: NonZeroUsize::new(1).unwrap(), }); @@ -136,6 +219,11 @@ async fn ingest( tokio::fs::remove_file(path).await?; } + reader_cancel.cancel(); + if let Some(handle) = reader_handle { + handle.await.unwrap(); + } + Ok(()) } @@ -147,6 +235,7 @@ fn ingest_main( put_count: usize, key_layout: KeyLayout, write_delta: WriteDelta, + concurrent_reads: ConcurrentReads, ) { pageserver::virtual_file::set_io_mode(io_mode); @@ -156,7 +245,15 @@ fn ingest_main( .unwrap(); runtime.block_on(async move { - let r = ingest(conf, put_size, put_count, key_layout, write_delta).await; + let r = ingest( + conf, + put_size, + put_count, + key_layout, + write_delta, + concurrent_reads, + ) + .await; if let Err(e) = r { panic!("{e:?}"); } @@ -195,6 +292,7 @@ fn criterion_benchmark(c: &mut Criterion) { key_size: usize, key_layout: KeyLayout, write_delta: WriteDelta, + concurrent_reads: ConcurrentReads, } #[derive(Clone)] struct HandPickedParameters { @@ -245,7 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) { ]; let exploded_parameters = { let mut out = Vec::new(); - for io_mode in IoMode::iter() { + for concurrent_reads in [ConcurrentReads::Yes, ConcurrentReads::No] { for param in expect.clone() { let HandPickedParameters { volume_mib, @@ -253,12 +351,18 @@ fn criterion_benchmark(c: &mut Criterion) { key_layout, write_delta, } = param; + + if key_layout != KeyLayout::Sequential && concurrent_reads == ConcurrentReads::Yes { + continue; + } + out.push(ExplodedParameters { - io_mode, + io_mode: IoMode::DirectRw, volume_mib, key_size, key_layout, write_delta, + concurrent_reads, }); } } @@ -272,9 +376,10 @@ fn criterion_benchmark(c: &mut Criterion) { key_size, key_layout, write_delta, + concurrent_reads, } = self; format!( - "io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}" + "io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?} concurrent_reads={concurrent_reads:?}" ) } } @@ -287,12 +392,23 @@ fn criterion_benchmark(c: &mut Criterion) { key_size, key_layout, write_delta, + concurrent_reads, } = params; let put_count = volume_mib * 1024 * 1024 / key_size; group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64)); group.sample_size(10); group.bench_function(id, |b| { - b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta)) + b.iter(|| { + ingest_main( + conf, + io_mode, + key_size, + put_count, + key_layout, + write_delta, + concurrent_reads, + ) + }) }); } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index bf54614baa..8d6d342cf9 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3426,7 +3426,7 @@ impl TimelineMetrics { pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) { assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. })); let labels = self.make_frozen_layer_labels(layer); - let size = layer.try_len().expect("frozen layer should have no writer"); + let size = layer.len(); TIMELINE_LAYER_COUNT .get_metric_with_label_values(&labels) .unwrap() @@ -3441,7 +3441,7 @@ impl TimelineMetrics { pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) { assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. })); let labels = self.make_frozen_layer_labels(layer); - let size = layer.try_len().expect("frozen layer should have no writer"); + let size = layer.len(); TIMELINE_LAYER_COUNT .get_metric_with_label_values(&labels) .unwrap() diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 2edf22e9fd..203b5bf592 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -3,7 +3,7 @@ use std::io; use std::sync::Arc; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicU64, Ordering}; use camino::Utf8PathBuf; use num_traits::Num; @@ -18,6 +18,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; +use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; @@ -30,9 +31,13 @@ pub struct EphemeralFile { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, page_cache_file_id: page_cache::FileId, - bytes_written: u64, file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter, - buffered_writer: BufferedWriter, + + buffered_writer: tokio::sync::RwLock, + + bytes_written: AtomicU64, + + resource_units: std::sync::Mutex, } type BufferedWriter = owned_buffers_io::write::BufferedWriter< @@ -94,9 +99,8 @@ impl EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, page_cache_file_id, - bytes_written: 0, file: file.clone(), - buffered_writer: BufferedWriter::new( + buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new( file, 0, || IoBufferMut::with_capacity(TAIL_SZ), @@ -104,7 +108,9 @@ impl EphemeralFile { cancel.child_token(), ctx, info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), - ), + )), + bytes_written: AtomicU64::new(0), + resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()), }) } } @@ -151,15 +157,17 @@ impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter #[derive(Debug, thiserror::Error)] pub(crate) enum EphemeralFileWriteError { - #[error("{0}")] - TooLong(String), #[error("cancelled")] Cancelled, } impl EphemeralFile { pub(crate) fn len(&self) -> u64 { - self.bytes_written + // TODO(vlad): The value returned here is not always correct if + // we have more than one concurrent writer. Writes are always + // sequenced, but we could grab the buffered writer lock if we wanted + // to. + self.bytes_written.load(Ordering::Acquire) } pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId { @@ -186,7 +194,7 @@ impl EphemeralFile { /// Panics if the write is short because there's no way we can recover from that. /// TODO: make upstack handle this as an error. pub(crate) async fn write_raw( - &mut self, + &self, srcbuf: &[u8], ctx: &RequestContext, ) -> Result { @@ -198,22 +206,13 @@ impl EphemeralFile { } async fn write_raw_controlled( - &mut self, + &self, srcbuf: &[u8], ctx: &RequestContext, ) -> Result<(u64, Option), EphemeralFileWriteError> { - let pos = self.bytes_written; + let mut writer = self.buffered_writer.write().await; - let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { - EphemeralFileWriteError::TooLong(format!( - "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", - srcbuf_len = srcbuf.len(), - )) - })?; - - // Write the payload - let (nwritten, control) = self - .buffered_writer + let (nwritten, control) = writer .write_buffered_borrowed_controlled(srcbuf, ctx) .await .map_err(|e| match e { @@ -225,43 +224,69 @@ impl EphemeralFile { "buffered writer has no short writes" ); - self.bytes_written = new_bytes_written; + // There's no realistic risk of overflow here. We won't have exabytes sized files on disk. + let pos = self + .bytes_written + .fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel); + + let mut resource_units = self.resource_units.lock().unwrap(); + resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed)); Ok((pos, control)) } + + pub(crate) fn tick(&self) -> Option { + let mut resource_units = self.resource_units.lock().unwrap(); + let len = self.bytes_written.load(Ordering::Relaxed); + resource_units.publish_size(len) + } } impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { async fn read_exact_at_eof_ok( &self, start: u64, - dst: tokio_epoll_uring::Slice, + mut dst: tokio_epoll_uring::Slice, ctx: &RequestContext, ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { - let submitted_offset = self.buffered_writer.bytes_submitted(); + // We will fill the slice in back to front. Hence, we need + // the slice to be fully initialized. + // TODO(vlad): Is there a nicer way of doing this? + dst.as_mut_rust_slice_full_zeroed(); - let mutable = match self.buffered_writer.inspect_mutable() { - Some(mutable) => &mutable[0..mutable.pending()], - None => { - // Timeline::cancel and hence buffered writer flush was cancelled. - // Remain read-available while timeline is shutting down. - &[] - } - }; + let writer = self.buffered_writer.read().await; - let maybe_flushed = self.buffered_writer.inspect_maybe_flushed(); + // Read bytes written while under lock. This is a hack to deal with concurrent + // writes updating the number of bytes written. `bytes_written` is not DIO alligned + // but we may end the read there. + // + // TODO(vlad): Feels like there's a nicer path where we align the end if it + // shoots over the end of the file. + let bytes_written = self.bytes_written.load(Ordering::Acquire); let dst_cap = dst.bytes_total().into_u64(); let end = { // saturating_add is correct here because the max file size is u64::MAX, so, // if start + dst.len() > u64::MAX, then we know it will be a short read let mut end: u64 = start.saturating_add(dst_cap); - if end > self.bytes_written { - end = self.bytes_written; + if end > bytes_written { + end = bytes_written; } end }; + let submitted_offset = writer.bytes_submitted(); + let maybe_flushed = writer.inspect_maybe_flushed(); + + let mutable = match writer.inspect_mutable() { + Some(mutable) => &mutable[0..mutable.pending()], + None => { + // Timeline::cancel and hence buffered writer flush was cancelled. + // Remain read-available while timeline is shutting down. + &[] + } + }; + // inclusive, exclusive #[derive(Debug)] struct Range(N, N); @@ -306,13 +331,33 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral let mutable_range = Range(std::cmp::max(start, submitted_offset), end); - let dst = if written_range.len() > 0 { + // There are three sources from which we might have to read data: + // 1. The file itself + // 2. The buffer which contains changes currently being flushed + // 3. The buffer which contains chnages yet to be flushed + // + // For better concurrency, we do them in reverse order: perform the in-memory + // reads while holding the writer lock, drop the writer lock and read from the + // file if required. + + let dst = if mutable_range.len() > 0 { + let offset_in_buffer = mutable_range + .0 + .checked_sub(submitted_offset) + .unwrap() + .into_usize(); + let to_copy = + &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; let bounds = dst.bounds(); - let slice = self - .file - .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) - .await?; - Slice::from_buf_bounds(Slice::into_inner(slice), bounds) + let mut view = dst.slice({ + let start = + written_range.len().into_usize() + maybe_flushed_range.len().into_usize(); + let end = start.checked_add(mutable_range.len().into_usize()).unwrap(); + start..end + }); + view.as_mut_rust_slice_full_zeroed() + .copy_from_slice(to_copy); + Slice::from_buf_bounds(Slice::into_inner(view), bounds) } else { dst }; @@ -342,24 +387,15 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst }; - let dst = if mutable_range.len() > 0 { - let offset_in_buffer = mutable_range - .0 - .checked_sub(submitted_offset) - .unwrap() - .into_usize(); - let to_copy = - &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; + drop(writer); + + let dst = if written_range.len() > 0 { let bounds = dst.bounds(); - let mut view = dst.slice({ - let start = - written_range.len().into_usize() + maybe_flushed_range.len().into_usize(); - let end = start.checked_add(mutable_range.len().into_usize()).unwrap(); - start..end - }); - view.as_mut_rust_slice_full_zeroed() - .copy_from_slice(to_copy); - Slice::from_buf_bounds(Slice::into_inner(view), bounds) + let slice = self + .file + .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) + .await?; + Slice::from_buf_bounds(Slice::into_inner(slice), bounds) } else { dst }; @@ -460,13 +496,15 @@ mod tests { let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); - let mutable = file.buffered_writer.mutable(); + let writer = file.buffered_writer.read().await; + let mutable = writer.mutable(); let cap = mutable.capacity(); let align = mutable.align(); + drop(writer); let write_nbytes = cap * 2 + cap / 2; @@ -504,10 +542,11 @@ mod tests { let file_contents = std::fs::read(file.file.path()).unwrap(); assert!(file_contents == content[0..cap * 2]); - let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); + let writer = file.buffered_writer.read().await; + let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap(); assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]); - let mutable_buffer_contents = file.buffered_writer.mutable(); + let mutable_buffer_contents = writer.mutable(); assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]); } @@ -517,12 +556,14 @@ mod tests { let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); // mutable buffer and maybe_flushed buffer each has `cap` bytes. - let cap = file.buffered_writer.mutable().capacity(); + let writer = file.buffered_writer.read().await; + let cap = writer.mutable().capacity(); + drop(writer); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -540,12 +581,13 @@ mod tests { 2 * cap.into_u64(), "buffered writer requires one write to be flushed if we write 2.5x buffer capacity" ); + let writer = file.buffered_writer.read().await; assert_eq!( - &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap], + &writer.inspect_maybe_flushed().unwrap()[0..cap], &content[cap..cap * 2] ); assert_eq!( - &file.buffered_writer.mutable()[0..cap / 2], + &writer.mutable()[0..cap / 2], &content[cap * 2..cap * 2 + cap / 2] ); } @@ -563,13 +605,15 @@ mod tests { let gate = utils::sync::gate::Gate::default(); let cancel = CancellationToken::new(); - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); - let mutable = file.buffered_writer.mutable(); + let writer = file.buffered_writer.read().await; + let mutable = writer.mutable(); let cap = mutable.capacity(); let align = mutable.align(); + drop(writer); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) .take(cap * 2 + cap / 2) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index e65d444f76..9fbb9d2438 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -109,7 +109,7 @@ pub(crate) enum OnDiskValue { /// Reconstruct data accumulated for a single key during a vectored get #[derive(Debug, Default)] -pub(crate) struct VectoredValueReconstructState { +pub struct VectoredValueReconstructState { pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>, pub(crate) situation: ValueReconstructSituation, @@ -244,13 +244,60 @@ impl VectoredValueReconstructState { res } + + /// Benchmarking utility to await for the completion of all pending ios + /// + /// # Cancel-Safety + /// + /// Technically fine to stop polling this future, but, the IOs will still + /// be executed to completion by the sidecar task and hold on to / consume resources. + /// Better not do it to make reasonsing about the system easier. + #[cfg(feature = "benchmarking")] + pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> { + let mut res = Ok(()); + + // We should try hard not to bail early, so that by the time we return from this + // function, all IO for this value is done. It's not required -- we could totally + // stop polling the IO futures in the sidecar task, they need to support that, + // but just stopping to poll doesn't reduce the IO load on the disk. It's easier + // to reason about the system if we just wait for all IO to complete, even if + // we're no longer interested in the result. + // + // Revisit this when IO futures are replaced with a more sophisticated IO system + // and an IO scheduler, where we know which IOs were submitted and which ones + // just queued. Cf the comment on IoConcurrency::spawn_io. + for (_lsn, waiter) in self.on_disk_values { + let value_recv_res = waiter + .wait_completion() + // we rely on the caller to poll us to completion, so this is not a bail point + .await; + + match (&mut res, value_recv_res) { + (Err(_), _) => { + // We've already failed, no need to process more. + } + (Ok(_), Err(_wait_err)) => { + // This shouldn't happen - likely the sidecar task panicked. + unreachable!(); + } + (Ok(_), Ok(Err(err))) => { + let err: std::io::Error = err; + res = Err(err); + } + (Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {} + (Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {} + } + } + + res + } } /// Bag of data accumulated during a vectored get.. -pub(crate) struct ValuesReconstructState { +pub struct ValuesReconstructState { /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline` /// should not expect to get anything from this hashmap. - pub(crate) keys: HashMap, + pub keys: HashMap, /// The keys which are already retrieved keys_done: KeySpaceRandomAccum, @@ -272,7 +319,7 @@ pub(crate) struct ValuesReconstructState { /// The desired end state is that we always do parallel IO. /// This struct and the dispatching in the impl will be removed once /// we've built enough confidence. -pub(crate) enum IoConcurrency { +pub enum IoConcurrency { Sequential, SidecarTask { task_id: usize, @@ -317,10 +364,7 @@ impl IoConcurrency { Self::spawn(SelectedIoConcurrency::Sequential) } - pub(crate) fn spawn_from_conf( - conf: GetVectoredConcurrentIo, - gate_guard: GateGuard, - ) -> IoConcurrency { + pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency { let selected = match conf { GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential, GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard), @@ -425,16 +469,6 @@ impl IoConcurrency { } } - pub(crate) fn clone(&self) -> Self { - match self { - IoConcurrency::Sequential => IoConcurrency::Sequential, - IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask { - task_id: *task_id, - ios_tx: ios_tx.clone(), - }, - } - } - /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string. /// /// The IO is represented as an opaque future. @@ -573,6 +607,18 @@ impl IoConcurrency { } } +impl Clone for IoConcurrency { + fn clone(&self) -> Self { + match self { + IoConcurrency::Sequential => IoConcurrency::Sequential, + IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask { + task_id: *task_id, + ios_tx: ios_tx.clone(), + }, + } + } +} + /// Make noise in case the [`ValuesReconstructState`] gets dropped while /// there are still IOs in flight. /// Refer to `collect_pending_ios` for why we prefer not to do that. @@ -603,7 +649,7 @@ impl Drop for ValuesReconstructState { } impl ValuesReconstructState { - pub(crate) fn new(io_concurrency: IoConcurrency) -> Self { + pub fn new(io_concurrency: IoConcurrency) -> Self { Self { keys: HashMap::new(), keys_done: KeySpaceRandomAccum::new(), diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 200beba115..8e5b0ba648 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -70,23 +70,15 @@ pub struct InMemoryLayer { /// We use a separate lock for the index to reduce the critical section /// during which reads cannot be planned. /// - /// If you need access to both the index and the underlying file at the same time, - /// respect the following locking order to avoid deadlocks: - /// 1. [`InMemoryLayer::inner`] - /// 2. [`InMemoryLayer::index`] - /// - /// Note that the file backing [`InMemoryLayer::inner`] is append-only, - /// so it is not necessary to hold simultaneous locks on index. - /// This avoids holding index locks across IO, and is crucial for avoiding read tail latency. + /// Note that the file backing [`InMemoryLayer::file`] is append-only, + /// so it is not necessary to hold a lock on the index while reading or writing from the file. /// In particular: - /// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`]. - /// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`]. + /// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`]. + /// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`]. index: RwLock>>, - /// The above fields never change, except for `end_lsn`, which is only set once, - /// and `index` (see rationale there). - /// All other changing parts are in `inner`, and protected by a mutex. - inner: RwLock, + /// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes. + file: EphemeralFile, estimated_in_mem_size: AtomicU64, } @@ -96,20 +88,10 @@ impl std::fmt::Debug for InMemoryLayer { f.debug_struct("InMemoryLayer") .field("start_lsn", &self.start_lsn) .field("end_lsn", &self.end_lsn) - .field("inner", &self.inner) .finish() } } -pub struct InMemoryLayerInner { - /// The values are stored in a serialized format in this file. - /// Each serialized Value is preceded by a 'u32' length field. - /// PerSeg::page_versions map stores offsets into this file. - file: EphemeralFile, - - resource_units: GlobalResourceUnits, -} - /// Support the same max blob length as blob_io, because ultimately /// all the InMemoryLayer contents end up being written into a delta layer, /// using the [`crate::tenant::blob_io`]. @@ -258,12 +240,6 @@ struct IndexEntryUnpacked { pos: u64, } -impl std::fmt::Debug for InMemoryLayerInner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InMemoryLayerInner").finish() - } -} - /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline, /// to minimize contention. /// @@ -280,7 +256,7 @@ pub(crate) struct GlobalResources { } // Per-timeline RAII struct for its contribution to [`GlobalResources`] -struct GlobalResourceUnits { +pub(crate) struct GlobalResourceUnits { // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible // for decrementing the global counter by this many bytes when dropped. dirty_bytes: u64, @@ -292,7 +268,7 @@ impl GlobalResourceUnits { // updated when the Timeline "ticks" in the background. const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024; - fn new() -> Self { + pub(crate) fn new() -> Self { GLOBAL_RESOURCES .dirty_layers .fetch_add(1, AtomicOrdering::Relaxed); @@ -304,7 +280,7 @@ impl GlobalResourceUnits { /// /// Returns the effective layer size limit that should be applied, if any, to keep /// the total number of dirty bytes below the configured maximum. - fn publish_size(&mut self, size: u64) -> Option { + pub(crate) fn publish_size(&mut self, size: u64) -> Option { let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) { Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed), Ordering::Greater => { @@ -349,7 +325,7 @@ impl GlobalResourceUnits { // Call publish_size if the input size differs from last published size by more than // the drift limit - fn maybe_publish_size(&mut self, size: u64) { + pub(crate) fn maybe_publish_size(&mut self, size: u64) { let publish = match size.cmp(&self.dirty_bytes) { Ordering::Equal => false, Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT, @@ -398,8 +374,8 @@ impl InMemoryLayer { } } - pub(crate) fn try_len(&self) -> Option { - self.inner.try_read().map(|i| i.file.len()).ok() + pub(crate) fn len(&self) -> u64 { + self.file.len() } pub(crate) fn assert_writable(&self) { @@ -430,7 +406,7 @@ impl InMemoryLayer { // Look up the keys in the provided keyspace and update // the reconstruct state with whatever is found. - pub(crate) async fn get_values_reconstruct_data( + pub async fn get_values_reconstruct_data( self: &Arc, keyspace: KeySpace, lsn_range: Range, @@ -479,14 +455,13 @@ impl InMemoryLayer { } } } - drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below + drop(index); // release the lock before we spawn the IO let read_from = Arc::clone(self); let read_ctx = ctx.attached_child(); reconstruct_state .spawn_io(async move { - let inner = read_from.inner.read().await; let f = vectored_dio_read::execute( - &inner.file, + &read_from.file, reads .iter() .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), @@ -518,7 +493,6 @@ impl InMemoryLayer { // This is kinda forced for InMemoryLayer because we need to inner.read() anyway, // but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit // drop for consistency among all three layer types. - drop(inner); drop(read_from); }) .await; @@ -549,12 +523,6 @@ impl std::fmt::Display for InMemoryLayer { } impl InMemoryLayer { - /// Get layer size. - pub async fn size(&self) -> Result { - let inner = self.inner.read().await; - Ok(inner.file.len()) - } - pub fn estimated_in_mem_size(&self) -> u64 { self.estimated_in_mem_size.load(AtomicOrdering::Relaxed) } @@ -587,10 +555,7 @@ impl InMemoryLayer { end_lsn: OnceLock::new(), opened_at: Instant::now(), index: RwLock::new(BTreeMap::new()), - inner: RwLock::new(InMemoryLayerInner { - file, - resource_units: GlobalResourceUnits::new(), - }), + file, estimated_in_mem_size: AtomicU64::new(0), }) } @@ -599,41 +564,37 @@ impl InMemoryLayer { /// /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from. /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable. + /// + /// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`]. + /// /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors. pub async fn put_batch( &self, serialized_batch: SerializedValueBatch, ctx: &RequestContext, ) -> anyhow::Result<()> { - let (base_offset, metadata) = { - let mut inner = self.inner.write().await; - self.assert_writable(); + self.assert_writable(); - let base_offset = inner.file.len(); + let base_offset = self.file.len(); - let SerializedValueBatch { - raw, - metadata, - max_lsn: _, - len: _, - } = serialized_batch; + let SerializedValueBatch { + raw, + metadata, + max_lsn: _, + len: _, + } = serialized_batch; - // Write the batch to the file - inner.file.write_raw(&raw, ctx).await?; - let new_size = inner.file.len(); + // Write the batch to the file + self.file.write_raw(&raw, ctx).await?; + let new_size = self.file.len(); - let expected_new_len = base_offset - .checked_add(raw.len().into_u64()) - // write_raw would error if we were to overflow u64. - // also IndexEntry and higher levels in - //the code don't allow the file to grow that large - .unwrap(); - assert_eq!(new_size, expected_new_len); - - inner.resource_units.maybe_publish_size(new_size); - - (base_offset, metadata) - }; + let expected_new_len = base_offset + .checked_add(raw.len().into_u64()) + // write_raw would error if we were to overflow u64. + // also IndexEntry and higher levels in + //the code don't allow the file to grow that large + .unwrap(); + assert_eq!(new_size, expected_new_len); // Update the index with the new entries let mut index = self.index.write().await; @@ -686,10 +647,8 @@ impl InMemoryLayer { self.opened_at } - pub(crate) async fn tick(&self) -> Option { - let mut inner = self.inner.write().await; - let size = inner.file.len(); - inner.resource_units.publish_size(size) + pub(crate) fn tick(&self) -> Option { + self.file.tick() } pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range, Lsn)]) -> Result<()> { @@ -753,12 +712,6 @@ impl InMemoryLayer { gate: &utils::sync::gate::Gate, cancel: CancellationToken, ) -> Result> { - // Grab the lock in read-mode. We hold it over the I/O, but because this - // layer is not writeable anymore, no one should be trying to acquire the - // write lock on it, so we shouldn't block anyone. See the comment on - // [`InMemoryLayer::freeze`] to understand how locking between the append path - // and layer flushing works. - let inner = self.inner.read().await; let index = self.index.read().await; use l0_flush::Inner; @@ -793,7 +746,7 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents = inner.file.load_to_io_buf(ctx).await?; + let file_contents = self.file.load_to_io_buf(ctx).await?; let file_contents = file_contents.freeze(); for (key, vec_map) in index.iter() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c8a41f7875..1bb17af146 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -816,7 +816,7 @@ impl From for FlushLayerError { } #[derive(thiserror::Error, Debug)] -pub(crate) enum GetVectoredError { +pub enum GetVectoredError { #[error("timeline shutting down")] Cancelled, @@ -849,7 +849,7 @@ impl From for GetVectoredError { } #[derive(thiserror::Error, Debug)] -pub(crate) enum GetReadyAncestorError { +pub enum GetReadyAncestorError { #[error("ancestor LSN wait error")] AncestorLsnTimeout(#[from] WaitLsnError), @@ -939,7 +939,7 @@ impl std::fmt::Debug for Timeline { } #[derive(thiserror::Error, Debug, Clone)] -pub(crate) enum WaitLsnError { +pub enum WaitLsnError { // Called on a timeline which is shutting down #[error("Shutdown")] Shutdown, @@ -1902,16 +1902,11 @@ impl Timeline { return; }; - let Some(current_size) = open_layer.try_len() else { - // Unexpected: since we hold the write guard, nobody else should be writing to this layer, so - // read lock to get size should always succeed. - tracing::warn!("Lock conflict while reading size of open layer"); - return; - }; + let current_size = open_layer.len(); let current_lsn = self.get_last_record_lsn(); - let checkpoint_distance_override = open_layer.tick().await; + let checkpoint_distance_override = open_layer.tick(); if let Some(size_override) = checkpoint_distance_override { if current_size > size_override { @@ -7372,7 +7367,7 @@ impl TimelineWriter<'_> { .tl .get_layer_for_write(at, &self.write_guard, ctx) .await?; - let initial_size = layer.size().await?; + let initial_size = layer.len(); let last_freeze_at = self.last_freeze_at.load(); self.write_guard.replace(TimelineWriterState::new(