mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
make Delta{Value,Key}Iter Send (#4472)
... by switching the internal RwLock to a OnceCell. This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). See https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883 for more context. fixes https://github.com/neondatabase/neon/issues/4471
This commit is contained in:
@@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -46,7 +47,6 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -184,7 +184,7 @@ pub struct DeltaLayer {
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: RwLock<DeltaLayerInner>,
|
||||
inner: OnceCell<DeltaLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayer {
|
||||
@@ -201,21 +201,17 @@ impl std::fmt::Debug for DeltaLayer {
|
||||
}
|
||||
|
||||
pub struct DeltaLayerInner {
|
||||
/// If false, the fields below have not been loaded into memory yet.
|
||||
loaded: bool,
|
||||
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
/// Reader object for reading blocks from the file. (None if not loaded yet)
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
.field("loaded", &self.loaded)
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.finish()
|
||||
@@ -246,7 +242,7 @@ impl Layer for DeltaLayer {
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -315,7 +311,7 @@ impl Layer for DeltaLayer {
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -500,51 +496,22 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
// Need to open the file and load the metadata. Upgrade our lock to
|
||||
// a write lock. (Or rather, release and re-lock in write mode.)
|
||||
drop(inner);
|
||||
let inner = self.inner.write().unwrap();
|
||||
if !inner.loaded {
|
||||
self.load_inner(inner).with_context(|| {
|
||||
format!("Failed to load delta layer {}", self.path().display())
|
||||
})?;
|
||||
} else {
|
||||
// Another thread loaded it while we were not holding the lock.
|
||||
}
|
||||
|
||||
// We now have the file open and loaded. There's no function to do
|
||||
// that in the std library RwLock, so we have to release and re-lock
|
||||
// in read mode. (To be precise, the lock guard was moved in the
|
||||
// above call to `load_inner`, so it's already been released). And
|
||||
// while we do that, another thread could unload again, so we have
|
||||
// to re-check and retry if that happens.
|
||||
}
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
|
||||
}
|
||||
|
||||
fn load_inner(&self, mut inner: RwLockWriteGuard<DeltaLayerInner>) -> Result<()> {
|
||||
fn load_inner(&self) -> Result<DeltaLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
// Open the file if it's not open already.
|
||||
if inner.file.is_none() {
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
inner.file = Some(FileBlockReader::new(file));
|
||||
}
|
||||
let file = inner.file.as_mut().unwrap();
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
@@ -571,13 +538,13 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
inner.index_start_blk = actual_summary.index_start_blk;
|
||||
inner.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
inner.loaded = true;
|
||||
Ok(())
|
||||
Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
@@ -599,12 +566,7 @@ impl DeltaLayer {
|
||||
file_size,
|
||||
),
|
||||
access_stats,
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,12 +593,7 @@ impl DeltaLayer {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -800,12 +757,7 @@ impl DeltaLayerWriterInner {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
@@ -940,13 +892,13 @@ struct DeltaValueIter<'a> {
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
struct Adapter<'a>(&'a DeltaLayerInner);
|
||||
|
||||
impl<'a> BlockReader for Adapter<'a> {
|
||||
type BlockLease = PageReadGuard<'static>;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
|
||||
self.0.file.as_ref().unwrap().read_blk(blknum)
|
||||
self.0.file.read_blk(blknum)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -959,8 +911,8 @@ impl<'a> Iterator for DeltaValueIter<'a> {
|
||||
}
|
||||
|
||||
impl<'a> DeltaValueIter<'a> {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1033,8 +985,8 @@ impl Iterator for DeltaKeyIter {
|
||||
}
|
||||
|
||||
impl<'a> DeltaKeyIter {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1074,3 +1026,21 @@ impl<'a> DeltaKeyIter {
|
||||
Ok(iter)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::DeltaKeyIter;
|
||||
use super::DeltaLayer;
|
||||
use super::DeltaValueIter;
|
||||
|
||||
// We will soon need the iters to be send in the compaction code.
|
||||
// Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883
|
||||
// Cf https://github.com/neondatabase/neon/issues/4471
|
||||
#[test]
|
||||
fn is_send() {
|
||||
fn assert_send<T: Send>() {}
|
||||
assert_send::<DeltaLayer>();
|
||||
assert_send::<DeltaValueIter>();
|
||||
assert_send::<DeltaKeyIter>();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user