Compare commits

...

8 Commits

Author SHA1 Message Date
Christian Schwarz
efb10b8905 run cargo hakari generate 2023-05-11 18:23:46 +02:00
Christian Schwarz
d629e136fa clippy-allow await while get_value_reconstruct_data calls + explainer 2023-05-11 18:09:19 +02:00
Christian Schwarz
5443c73d20 layer impls: run get_value_reconstruct_data in spawn_blocking
Effectively, this means we use the tokio runtime's spawn_blocking-thread-pool
to execute the layer reads, instead of doing the reads on the
tokio runtime's main executor threads.

The use of the thread pool adds some overhead, but, not blocking
the main executor threads is more important, because they can now
execute other async tasks while we do the IO.

With a sufficiently large spawn_blocking-thread-pool, we also get more
IO parallelism between timelines than with blocking the main executor
threads. So, we might push the pageserver's NVMe closer to its limits.
But right now, there's lots of headroom.
2023-05-11 17:10:15 +02:00
Christian Schwarz
6718cf0ae6 asyncify get_value_reconstruct_data (impls still use sync IO) 2023-05-11 17:00:43 +02:00
Christian Schwarz
84a4f48ec5 make parking_lot lock guards Send to be able to hold them across .await 2023-05-10 20:08:49 +02:00
Christian Schwarz
908ffc5fef switch Timeline::layers to parking_lot::RwLock 2023-05-10 19:53:02 +02:00
Christian Schwarz
866f13f24b switch Tenant::timelines to parking_lot::Mutex 2023-05-10 19:51:20 +02:00
Christian Schwarz
5f191d3e2f THE PLAN
- Timeline::get calls reconstruct data ⇒ turn that into a Pin<Box<dyn Future…>> .
- Problem there: we call layer.get_reconstruct_data while holding layer map lock ⇒ it’s a std rwlock ⇒ need to turn it into a parking_lot mutex whose guards are Send
- Problem there: we sometimes hold the Tenant::timelines std mutex while holding layer map lock (branching?) ⇒ need to turn it into parking_lot mutex whose guards are Send

So, tackle things in reverse here.
2023-05-10 19:48:15 +02:00
11 changed files with 307 additions and 244 deletions

2
Cargo.lock generated
View File

@@ -2727,6 +2727,7 @@ dependencies = [
"num-traits",
"once_cell",
"pageserver_api",
"parking_lot",
"pin-project-lite",
"postgres",
"postgres-protocol",
@@ -5424,6 +5425,7 @@ dependencies = [
"num-bigint",
"num-integer",
"num-traits",
"parking_lot",
"prost",
"rand",
"regex",

View File

@@ -75,6 +75,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
# feature "send_guard" markes it so that lock guards are Send
parking_lot = { workspace = true, default-features = false, features = [ "send_guard" ] }
[dev-dependencies]
criterion.workspace = true

View File

@@ -39,8 +39,7 @@ use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use self::config::TenantConf;
@@ -133,7 +132,7 @@ pub struct Tenant {
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_id: TenantId,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
timelines: parking_lot::Mutex<HashMap<TimelineId, Arc<Timeline>>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration
@@ -184,7 +183,7 @@ impl UninitializedTimeline<'_> {
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
let mut timelines = self.owning_tenant.timelines.lock();
self.initialize_with_lock(ctx, &mut timelines, true, true)
}
@@ -275,7 +274,7 @@ impl UninitializedTimeline<'_> {
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
let mut timelines = self.owning_tenant.timelines.lock();
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
@@ -494,7 +493,7 @@ impl Tenant {
let timeline = {
// avoiding holding it across awaits
let mut timelines_accessor = self.timelines.lock().unwrap();
let mut timelines_accessor = self.timelines.lock();
if timelines_accessor.contains_key(&timeline_id) {
anyhow::bail!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
@@ -561,7 +560,6 @@ impl Tenant {
|| timeline
.layers
.read()
.unwrap()
.iter_historic_layers()
.next()
.is_some(),
@@ -806,7 +804,7 @@ impl Tenant {
.context("Failed to create new timeline directory")?;
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
anyhow::anyhow!(
@@ -1141,7 +1139,7 @@ impl Tenant {
timeline_id: TimelineId,
active_only: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timelines_accessor = self.timelines.lock().unwrap();
let timelines_accessor = self.timelines.lock();
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
})?;
@@ -1161,12 +1159,7 @@ impl Tenant {
/// Lists timelines the tenant contains.
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
self.timelines
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect()
self.timelines.lock().values().map(Arc::clone).collect()
}
/// This is used to create the initial 'main' timeline during bootstrapping,
@@ -1184,7 +1177,7 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
drop(timelines);
@@ -1316,7 +1309,7 @@ impl Tenant {
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines_to_compact = {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
let timelines_to_compact = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
@@ -1345,7 +1338,7 @@ impl Tenant {
// flushing. We don't want to block everything else while the
// flushing is performed.
let timelines_to_flush = {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
timelines
.iter()
.map(|(_id, timeline)| Arc::clone(timeline))
@@ -1370,7 +1363,7 @@ impl Tenant {
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
// Ensure that there are no child timelines **attached to that pageserver**,
// because detach removes files, which will break child branches
@@ -1517,7 +1510,7 @@ impl Tenant {
});
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
@@ -1585,7 +1578,7 @@ impl Tenant {
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let timelines_accessor = self.timelines.lock();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
@@ -1652,7 +1645,7 @@ impl Tenant {
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let timelines_accessor = self.timelines.lock();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
@@ -1945,7 +1938,7 @@ impl Tenant {
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
timelines: parking_lot::Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
@@ -2173,7 +2166,7 @@ impl Tenant {
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
let mut all_branchpoints = BTreeSet::new();
let timeline_ids = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
@@ -2273,7 +2266,7 @@ impl Tenant {
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
self.create_timeline_uninit_mark(dst_id, &timelines)?
};
@@ -2338,7 +2331,7 @@ impl Tenant {
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
let new_timeline = self
.prepare_timeline(
dst_id,
@@ -2375,7 +2368,7 @@ impl Tenant {
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
let timelines = self.timelines.lock();
self.create_timeline_uninit_mark(timeline_id, &timelines)?
};
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
@@ -2461,7 +2454,7 @@ impl Tenant {
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
let mut timelines = self.timelines.lock();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
};
@@ -2508,8 +2501,7 @@ impl Tenant {
) {
Ok(new_timeline) => {
if init_layers {
new_timeline.layers.write().unwrap().next_open_layer_at =
Some(new_timeline.initdb_lsn);
new_timeline.layers.write().next_open_layer_at = Some(new_timeline.initdb_lsn);
}
debug!(
"Successfully created initial files for timeline {tenant_id}/{new_timeline_id}"
@@ -2564,7 +2556,7 @@ impl Tenant {
fn create_timeline_uninit_mark(
&self,
timeline_id: TimelineId,
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
timelines: &parking_lot::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
) -> anyhow::Result<TimelineUninitMark> {
let tenant_id = self.tenant_id;

View File

@@ -22,6 +22,7 @@ use pageserver_api::models::{
};
use std::ops::Range;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
@@ -335,6 +336,13 @@ impl LayerAccessStats {
}
}
pub(crate) type GetValueReconstructFuture = Pin<
Box<
dyn Send
+ std::future::Future<Output = Result<(ValueReconstructState, ValueReconstructResult)>>,
>,
>;
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`].
///
@@ -372,12 +380,12 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -486,12 +494,12 @@ impl Layer for LayerDescriptor {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -46,7 +46,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;
use utils::{
@@ -56,8 +56,8 @@ use utils::{
};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf,
DeltaFileName, GetValueReconstructFuture, Layer, LayerAccessStats, LayerAccessStatsReset,
LayerFileName, LayerIter, LayerKeyIter, PathOrConf,
};
///
@@ -318,89 +318,94 @@ impl Layer for DeltaLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
ensure!(self.key_range.contains(&key));
ensure!(self.key_range.contains(&key));
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// release metadata lock and close the file
}
}
// release metadata lock and close the file
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -43,7 +43,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use tracing::*;
use utils::{
@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
use super::{GetValueReconstructFuture, Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -197,38 +197,45 @@ impl Layer for ImageLayer {
/// Look up given page in the file
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
tokio::task::spawn_blocking(move || {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -12,7 +12,7 @@ use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -27,9 +27,9 @@ use utils::{
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use super::{DeltaLayer, DeltaLayerWriter, Layer};
use super::{DeltaLayer, DeltaLayerWriter, GetValueReconstructFuture, Layer};
thread_local! {
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
@@ -191,52 +191,60 @@ impl Layer for InMemoryLayer {
/// Look up given value in the layer.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
// The in-memory layer isn't actually in-memory. It uses EphemeralFile.
// So, this does do IO.
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let inner = self.inner.read().unwrap();
let inner = self.inner.read().unwrap();
let mut reader = inner.file.block_cursor();
let mut reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
}
}
// release lock on 'inner'
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,8 +21,8 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
use super::image_layer::ImageLayer;
use super::{
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer,
DeltaLayer, GetValueReconstructFuture, LayerAccessStats, LayerAccessStatsReset, LayerIter,
LayerKeyIter, LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -83,16 +83,18 @@ impl Layer for RemoteLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
})
}
fn is_incremental(&self) -> bool {

View File

@@ -121,7 +121,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(super) layers: parking_lot::RwLock<LayerMap<dyn PersistentLayer>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -504,12 +504,13 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?;
self.metrics
@@ -547,7 +548,7 @@ impl Timeline {
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
let layer_map = self.layers.read();
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -847,7 +848,7 @@ impl Timeline {
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
@@ -927,7 +928,7 @@ impl Timeline {
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
let layer_map = self.layers.read();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1047,7 +1048,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write();
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1311,7 +1312,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: parking_lot::RwLock::new(LayerMap::default()),
walredo_mgr,
walreceiver,
@@ -1452,7 +1453,7 @@ impl Timeline {
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1581,7 +1582,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write();
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1734,7 +1735,6 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2060,7 +2060,7 @@ impl Timeline {
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
for historic_layer in self.layers.read().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2143,13 +2143,31 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
// TODO: find a way to not hold the Timeline::layers lock during get_value_reconstruct_data calls.
//
// Since these calls do local disk IO, they'll be reasonably fast, until come disk IOPS bound.
// We have lots of headroom on current pageservers, so, it's going to be fine for now.
//
// We can't use tokio::sync::RwLock that easily because its guard is not Send, but,
// many tasks that access Timeline::layers run inside task_mgr tasks, which are required
// to be Send. It has been tried in origin/problame/asyncify-get-reconstruct-data--tokio-sync.
//
// The solution will probably be to have an immutable + multi-versioned layer map, allowing
// us to grab a snapshot of the layer map once and execute this function on the snapshot.
//
// Or, we could invest time to figure out whether we can drop the layer map lock after
// we grabbed the layer, do the IO, re-aquire, and continue the traversal.
//
// (Why is this allow() not inside the function? Because clippy doesn't respect it then).
#[allow(clippy::await_holding_lock)]
async fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2176,12 +2194,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
return Ok(reconstruct_state);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2227,7 +2245,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read();
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2238,13 +2256,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2264,13 +2288,19 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2298,13 +2328,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2407,7 +2443,7 @@ impl Timeline {
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
ensure!(lsn.is_aligned());
@@ -2480,7 +2516,7 @@ impl Timeline {
} else {
Some(self.write_lock.lock().unwrap())
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2518,7 +2554,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2619,7 +2655,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -2737,7 +2773,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -2792,7 +2828,7 @@ impl Timeline {
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
let mut max_deltas = 0;
@@ -2935,7 +2971,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
@@ -3002,7 +3038,7 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
@@ -3125,7 +3161,7 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let layers = self.layers.read(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3362,7 +3398,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3621,7 +3657,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write();
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -3904,7 +3940,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write();
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4062,7 +4098,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4165,7 +4201,7 @@ impl LocalLayerInfoForDiskUsageEviction {
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();

View File

@@ -178,7 +178,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {

View File

@@ -36,6 +36,7 @@ nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128"] }
parking_lot = { version = "0.12", features = ["send_guard"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }