From 2011cc05cdd47dd37a3b7304418fd0a74c872188 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 12 Jun 2023 16:22:52 +0200 Subject: [PATCH] make Delta{Value,Key}Iter Send (#4472) ... by switching the internal RwLock to a OnceCell. This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). See https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883 for more context. fixes https://github.com/neondatabase/neon/issues/4471 --- .../src/tenant/storage_layer/delta_layer.rs | 126 +++++++----------- 1 file changed, 48 insertions(+), 78 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 624fe8dac4..6e14663121 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; +use once_cell::sync::OnceCell; use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -46,7 +47,6 @@ use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; -use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::*; use utils::{ @@ -184,7 +184,7 @@ pub struct DeltaLayer { access_stats: LayerAccessStats, - inner: RwLock, + inner: OnceCell, } impl std::fmt::Debug for DeltaLayer { @@ -201,21 +201,17 @@ impl std::fmt::Debug for DeltaLayer { } pub struct DeltaLayerInner { - /// If false, the fields below have not been loaded into memory yet. - loaded: bool, - // values copied from summary index_start_blk: u32, index_root_blk: u32, - /// Reader object for reading blocks from the file. (None if not loaded yet) - file: Option>, + /// Reader object for reading blocks from the file. + file: FileBlockReader, } impl std::fmt::Debug for DeltaLayerInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DeltaLayerInner") - .field("loaded", &self.loaded) .field("index_start_blk", &self.index_start_blk) .field("index_root_blk", &self.index_root_blk) .finish() @@ -246,7 +242,7 @@ impl Layer for DeltaLayer { inner.index_start_blk, inner.index_root_blk ); - let file = inner.file.as_ref().unwrap(); + let file = &inner.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( inner.index_start_blk, inner.index_root_blk, @@ -315,7 +311,7 @@ impl Layer for DeltaLayer { let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; // Scan the page versions backwards, starting from `lsn`. - let file = inner.file.as_ref().unwrap(); + let file = &inner.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( inner.index_start_blk, inner.index_root_blk, @@ -500,51 +496,22 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load( - &self, - access_kind: LayerAccessKind, - ctx: &RequestContext, - ) -> Result> { + fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> { self.access_stats .record_access(access_kind, ctx.task_kind()); - loop { - // Quick exit if already loaded - let inner = self.inner.read().unwrap(); - if inner.loaded { - return Ok(inner); - } - - // Need to open the file and load the metadata. Upgrade our lock to - // a write lock. (Or rather, release and re-lock in write mode.) - drop(inner); - let inner = self.inner.write().unwrap(); - if !inner.loaded { - self.load_inner(inner).with_context(|| { - format!("Failed to load delta layer {}", self.path().display()) - })?; - } else { - // Another thread loaded it while we were not holding the lock. - } - - // We now have the file open and loaded. There's no function to do - // that in the std library RwLock, so we have to release and re-lock - // in read mode. (To be precise, the lock guard was moved in the - // above call to `load_inner`, so it's already been released). And - // while we do that, another thread could unload again, so we have - // to re-check and retry if that happens. - } + // Quick exit if already loaded + self.inner + .get_or_try_init(|| self.load_inner()) + .with_context(|| format!("Failed to load delta layer {}", self.path().display())) } - fn load_inner(&self, mut inner: RwLockWriteGuard) -> Result<()> { + fn load_inner(&self) -> Result { let path = self.path(); - // Open the file if it's not open already. - if inner.file.is_none() { - let file = VirtualFile::open(&path) - .with_context(|| format!("Failed to open file '{}'", path.display()))?; - inner.file = Some(FileBlockReader::new(file)); - } - let file = inner.file.as_mut().unwrap(); + let file = VirtualFile::open(&path) + .with_context(|| format!("Failed to open file '{}'", path.display()))?; + let file = FileBlockReader::new(file); + let summary_blk = file.read_blk(0)?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; @@ -571,13 +538,13 @@ impl DeltaLayer { } } - inner.index_start_blk = actual_summary.index_start_blk; - inner.index_root_blk = actual_summary.index_root_blk; - debug!("loaded from {}", &path.display()); - inner.loaded = true; - Ok(()) + Ok(DeltaLayerInner { + file, + index_start_blk: actual_summary.index_start_blk, + index_root_blk: actual_summary.index_root_blk, + }) } /// Create a DeltaLayer struct representing an existing file on disk. @@ -599,12 +566,7 @@ impl DeltaLayer { file_size, ), access_stats, - inner: RwLock::new(DeltaLayerInner { - loaded: false, - file: None, - index_start_blk: 0, - index_root_blk: 0, - }), + inner: once_cell::sync::OnceCell::new(), } } @@ -631,12 +593,7 @@ impl DeltaLayer { metadata.len(), ), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), - inner: RwLock::new(DeltaLayerInner { - loaded: false, - file: None, - index_start_blk: 0, - index_root_blk: 0, - }), + inner: once_cell::sync::OnceCell::new(), }) } @@ -800,12 +757,7 @@ impl DeltaLayerWriterInner { metadata.len(), ), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), - inner: RwLock::new(DeltaLayerInner { - loaded: false, - file: None, - index_start_blk, - index_root_blk, - }), + inner: once_cell::sync::OnceCell::new(), }; // fsync the file @@ -940,13 +892,13 @@ struct DeltaValueIter<'a> { reader: BlockCursor>, } -struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); +struct Adapter<'a>(&'a DeltaLayerInner); impl<'a> BlockReader for Adapter<'a> { type BlockLease = PageReadGuard<'static>; fn read_blk(&self, blknum: u32) -> Result { - self.0.file.as_ref().unwrap().read_blk(blknum) + self.0.file.read_blk(blknum) } } @@ -959,8 +911,8 @@ impl<'a> Iterator for DeltaValueIter<'a> { } impl<'a> DeltaValueIter<'a> { - fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { - let file = inner.file.as_ref().unwrap(); + fn new(inner: &'a DeltaLayerInner) -> Result { + let file = &inner.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( inner.index_start_blk, inner.index_root_blk, @@ -1033,8 +985,8 @@ impl Iterator for DeltaKeyIter { } impl<'a> DeltaKeyIter { - fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { - let file = inner.file.as_ref().unwrap(); + fn new(inner: &'a DeltaLayerInner) -> Result { + let file = &inner.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( inner.index_start_blk, inner.index_root_blk, @@ -1074,3 +1026,21 @@ impl<'a> DeltaKeyIter { Ok(iter) } } + +#[cfg(test)] +mod test { + use super::DeltaKeyIter; + use super::DeltaLayer; + use super::DeltaValueIter; + + // We will soon need the iters to be send in the compaction code. + // Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883 + // Cf https://github.com/neondatabase/neon/issues/4471 + #[test] + fn is_send() { + fn assert_send() {} + assert_send::(); + assert_send::(); + assert_send::(); + } +}