Compare commits

...

16 Commits

Author SHA1 Message Date
Christian Schwarz
7ef703c0fc Merge remote-tracking branch 'origin/main' into problame/asyncify-get-reconstruct-data--tokio-sync
Maybe this fixes the flaky tests
2023-05-19 17:17:45 +02:00
Christian Schwarz
cbe19e1cbb Merge remote-tracking branch 'origin/main' into problame/asyncify-get-reconstruct-data--tokio-sync
Conflicts:
    pageserver/src/tenant.rs
	pageserver/src/tenant/timeline.rs

Negligible.
2023-05-17 14:10:12 +02:00
Christian Schwarz
860dacf18c clippy-allow await while get_value_reconstruct_data calls + explainer 2023-05-12 16:53:52 +02:00
Christian Schwarz
7a8229f3af layer impls: run get_value_reconstruct_data in spawn_blocking
Effectively, this means we use the tokio runtime's spawn_blocking-thread-pool
to execute the layer reads, instead of doing the reads on the
tokio runtime's main executor threads.

The use of the thread pool adds some overhead, but, not blocking
the main executor threads is more important, because they can now
execute other async tasks while we do the IO.

With a sufficiently large spawn_blocking-thread-pool, we also get more
IO parallelism between timelines than with blocking the main executor
threads. So, we might push the pageserver's NVMe closer to its limits.
But right now, there's lots of headroom.
2023-05-12 16:53:52 +02:00
Christian Schwarz
7ef666c723 asyncify get_value_reconstruct_data (impls still use sync IO) 2023-05-12 16:53:52 +02:00
Christian Schwarz
0bda9d3c94 address all the tests 2023-05-12 16:53:52 +02:00
Christian Schwarz
9fe7b7a079 turn Timeline::layers into tokio::sync::RwLock 2023-05-12 16:53:50 +02:00
Christian Schwarz
74a4cf0b2a follow-up: address tests 2023-05-12 16:51:29 +02:00
Christian Schwarz
a79835dbc0 (does not compile): make TimelineWriter Send by using tokio::sync Mutex internally
fails with

cs@devvm:[~/src/neon]: cargo check -p pageserver  --features testing
    Checking pageserver v0.1.0 (/home/cs/src/neon/pageserver)
error: future cannot be sent between threads safely
   --> pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:426:33
    |
426 |         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
    |                                 ^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `Instrumented<[async block@pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:427:13: 439:14]>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>`
note: future is not `Send` as this value is used across an await
   --> pageserver/src/tenant/timeline.rs:872:46
    |
850 |         let layers = self.layers.read().unwrap();
    |             ------ has type `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>` which is not `Send`
...
872 |                 self.freeze_inmem_layer(true).await;
    |                                              ^^^^^^ await occurs here, with `layers` maybe used later
...
881 |     }
    |     - `layers` is later dropped here
note: required by a bound in `TaskHandle::<E>::spawn`
   --> pageserver/src/tenant/timeline/walreceiver.rs:196:52
    |
192 |     fn spawn<Fut>(
    |        ----- required by a bound in this
...
196 |         Fut: Future<Output = anyhow::Result<()>> + Send,
    |                                                    ^^^^ required by this bound in `TaskHandle::<E>::spawn`

error: could not compile `pageserver` due to previous error
2023-05-12 16:51:29 +02:00
Christian Schwarz
1b2663350c basebackup import: pre-lock the layer map for the flush() calls
The checkpointer loop isn't running anyway, so, there's no risk of
blocking it through the pre-lock.
2023-05-12 16:51:29 +02:00
Christian Schwarz
e8ae409bdc controversial but necessary: keep holding layer map lock inside compact_level0_phase1
Without this, the seocnd read().unwrap() becomes an await point,
which makes the future not-Send, but, we require it to be Send
because it runs inside task_mgr::spawn, which requires the Fut's to be Send
2023-05-12 16:51:29 +02:00
Christian Schwarz
46acdaed8d follow-up: fix all the test cases 2023-05-12 16:51:29 +02:00
Christian Schwarz
d00ad45ce7 make Tenant::timelines a tokio::sync::Mutex 2023-05-12 16:51:29 +02:00
Christian Schwarz
388122fe27 hack: allow using async inside Tenant::activate 2023-05-12 16:51:28 +02:00
Christian Schwarz
3919dd7ef6 refactor: prepare to allow async code inside Tenant::state.send_modify() 2023-05-12 16:51:28 +02:00
Christian Schwarz
bd516a491b THE PLAN
- Timeline::get calls reconstruct data ⇒ turn that into a Pin<Box<dyn Future…>> .
- Problem there: we call layer.get_reconstruct_data while holding layer map lock ⇒ it’s a std rwlock ⇒ need to turn it into tokio mutex
- Problem there: we sometimes hold the Tenant::timelines std mutex while holding layer map lock (branching?) ⇒ need to turn it into tokio mutex
- Prolem there: tenant.rs `self.state.send_modify(|| { ... timelines.lock().await })`;
  Can't await inside the closure, and we don't have async closures.
  And we don't control the tokio::sync::watch API.

So, tackle things in reverse here.
2023-05-12 16:51:28 +02:00
20 changed files with 797 additions and 530 deletions

View File

@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines().iter() {
for timeline in tenant.list_timelines().await.iter() {
// collect per-timeline metrics only for active timelines
if timeline.is_active() {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());

View File

@@ -508,11 +508,11 @@ async fn collect_eviction_candidates(
// a little unfair to tenants during shutdown in such a situation is tolerable.
let mut tenant_candidates = Vec::new();
let mut max_layer_size = 0;
for tl in tenant.list_timelines() {
for tl in tenant.list_timelines().await {
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction();
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -175,7 +175,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
let mut info = build_timeline_info_common(timeline, ctx).await?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -193,7 +193,7 @@ async fn build_timeline_info(
Ok(info)
}
fn build_timeline_info_common(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -224,7 +224,7 @@ fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum());
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -283,6 +283,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -304,7 +305,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let timelines = tenant.list_timelines();
let timelines = tenant.list_timelines().await;
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
@@ -343,6 +344,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline = tenant
.get_timeline(timeline_id, false)
.await
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
@@ -502,8 +504,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum();
for timeline in tenant.list_timelines().await.iter() {
current_physical_size += timeline.layer_size_sum().await;
}
let state = tenant.current_state();
@@ -608,7 +610,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset);
let layer_map_info = timeline.layer_map_info(reset).await;
json_response(StatusCode::OK, layer_map_info)
}
@@ -956,6 +958,7 @@ async fn active_timeline_of_active_tenant(
let tenant = mgr::get_tenant(tenant_id, true).await?;
tenant
.get_timeline(timeline_id, true)
.await
.map_err(ApiError::NotFound)
}

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -376,7 +376,7 @@ impl PageServerHandler {
};
// Check that the timeline exists
let timeline = tenant.get_timeline(timeline_id, true)?;
let timeline = tenant.get_timeline(timeline_id, true).await?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
@@ -475,7 +475,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -1184,6 +1186,6 @@ async fn get_active_tenant_timeline(
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTenantError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
let timeline = tenant.get_timeline(timeline_id, true)?;
let timeline = tenant.get_timeline(timeline_id, true).await?;
Ok(timeline)
}

View File

@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.writer().await;
let mut layer_map = self.tline.layers.write().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
false
} else {
true
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
writer.delete(key_range, lsn).await?;
}
writer.finish_write(lsn);
@@ -1594,18 +1596,20 @@ fn is_slru_block_key(key: Key) -> bool {
}
#[cfg(test)]
pub fn create_test_timeline(
pub async fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)?
.initialize(ctx)?;
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)
.await?
.initialize(ctx)
.await?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
m.commit().await?;
Ok(tline)
}
@@ -1632,7 +1636,7 @@ mod tests {
#[test]
fn test_list_rels_drop() -> Result<()> {
let repo = RepoHarness::create("test_list_rels_drop")?.load();
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
const TESTDB: u32 = 111;
// Import initial dummy checkpoint record, otherwise the get_timeline() call

File diff suppressed because it is too large Load Diff

View File

@@ -316,7 +316,7 @@ pub async fn set_new_tenant_config(
new_tenant_conf,
false,
)?;
tenant.set_new_tenant_config(new_tenant_conf);
tenant.set_new_tenant_config(new_tenant_conf).await;
Ok(())
}
@@ -677,6 +677,7 @@ pub async fn immediate_compact(
let timeline = tenant
.get_timeline(timeline_id, true)
.await
.map_err(ApiError::NotFound)?;
// Run in task_mgr to avoid race with tenant_detach operation

View File

@@ -1264,9 +1264,13 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
runtime.block_on(async {
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let _ = timeline.initialize(&ctx).await?;
anyhow::Ok(())
})?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
.context("Failed to refresh gc_info before gathering inputs")?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
let mut timelines = tenant.list_timelines().await;
if timelines.is_empty() {
// perhaps the tenant has just been created, and as such doesn't have any data yet

View File

@@ -22,6 +22,7 @@ use pageserver_api::models::{
};
use std::ops::Range;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
@@ -335,6 +336,13 @@ impl LayerAccessStats {
}
}
pub(crate) type GetValueReconstructFuture = Pin<
Box<
dyn Send
+ std::future::Future<Output = Result<(ValueReconstructState, ValueReconstructResult)>>,
>,
>;
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`].
///
@@ -372,12 +380,12 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -486,12 +494,12 @@ impl Layer for LayerDescriptor {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -46,7 +46,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;
use utils::{
@@ -56,8 +56,8 @@ use utils::{
};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf,
DeltaFileName, GetValueReconstructFuture, Layer, LayerAccessStats, LayerAccessStatsReset,
LayerFileName, LayerIter, LayerKeyIter, PathOrConf,
};
///
@@ -318,89 +318,94 @@ impl Layer for DeltaLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
ensure!(self.key_range.contains(&key));
ensure!(self.key_range.contains(&key));
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// release metadata lock and close the file
}
}
// release metadata lock and close the file
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -43,7 +43,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use tracing::*;
use utils::{
@@ -53,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
use super::{GetValueReconstructFuture, Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -197,38 +197,45 @@ impl Layer for ImageLayer {
/// Look up given page in the file
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
tokio::task::spawn_blocking(move || {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
})
.await
.context("spawn_blocking")?
})
}
}

View File

@@ -12,7 +12,7 @@ use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -27,9 +27,9 @@ use utils::{
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use super::{DeltaLayer, DeltaLayerWriter, Layer};
use super::{DeltaLayer, DeltaLayerWriter, GetValueReconstructFuture, Layer};
thread_local! {
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
@@ -191,52 +191,60 @@ impl Layer for InMemoryLayer {
/// Look up given value in the layer.
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
// The in-memory layer isn't actually in-memory. It uses EphemeralFile.
// So, this does do IO.
tokio::task::spawn_blocking(move || {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let inner = self.inner.read().unwrap();
let inner = self.inner.read().unwrap();
let mut reader = inner.file.block_cursor();
let mut reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
}
}
// release lock on 'inner'
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
})
.await
.context("spawn_blocking")?
})
}
}
@@ -304,7 +312,7 @@ impl InMemoryLayer {
Ok(())
}
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,8 +21,8 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
use super::image_layer::ImageLayer;
use super::{
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer,
DeltaLayer, GetValueReconstructFuture, LayerAccessStats, LayerAccessStatsReset, LayerIter,
LayerKeyIter, LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -83,16 +83,18 @@ impl Layer for RemoteLayer {
}
fn get_value_reconstruct_data(
&self,
self: Arc<Self>,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> GetValueReconstructFuture {
Box::pin(async move {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
})
}
fn is_incremental(&self) -> bool {

View File

@@ -29,7 +29,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
@@ -121,7 +121,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -170,7 +170,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_state: Mutex<FlushLoopState>,
@@ -229,6 +229,8 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
@@ -521,12 +523,13 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?;
self.metrics
@@ -563,8 +566,8 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -645,7 +648,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -825,10 +828,10 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
pub async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock().await,
}
}
@@ -862,9 +865,9 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
@@ -886,7 +889,7 @@ impl Timeline {
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
@@ -968,8 +971,8 @@ impl Timeline {
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -991,7 +994,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
@@ -1004,7 +1007,7 @@ impl Timeline {
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1089,7 +1092,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1353,7 +1356,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new(LayerMap::default()),
walredo_mgr,
walreceiver,
@@ -1387,7 +1390,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
@@ -1493,8 +1496,8 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1623,7 +1626,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1776,7 +1779,7 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2128,8 +2131,8 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2212,13 +2215,31 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
// TODO: find a way to not hold the Timeline::layers lock during get_value_reconstruct_data calls.
//
// Since these calls do local disk IO, they'll be reasonably fast, until come disk IOPS bound.
// We have lots of headroom on current pageservers, so, it's going to be fine for now.
//
// We can't use tokio::sync::RwLock that easily because its guard is not Send, but,
// many tasks that access Timeline::layers run inside task_mgr tasks, which are required
// to be Send. It has been tried in origin/problame/asyncify-get-reconstruct-data--tokio-sync.
//
// The solution will probably be to have an immutable + multi-versioned layer map, allowing
// us to grab a snapshot of the layer map once and execute this function on the snapshot.
//
// Or, we could invest time to figure out whether we can drop the layer map lock after
// we grabbed the layer, do the IO, re-aquire, and continue the traversal.
//
// (Why is this allow() not inside the function? Because clippy doesn't respect it then).
#[allow(clippy::await_holding_lock)]
async fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2245,12 +2266,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
return Ok(reconstruct_state);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2336,7 +2357,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read().await;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2347,13 +2368,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2373,13 +2400,19 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2407,13 +2440,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2515,9 +2554,16 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
self.get_layer_for_write_locked(lsn, &mut layers)
}
fn get_layer_for_write_locked(
&self,
lsn: Lsn,
layers: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
@@ -2560,16 +2606,29 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
fn put_value_locked(
&self,
key: Key,
lsn: Lsn,
val: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -2581,15 +2640,15 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().unwrap())
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2627,7 +2686,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2719,7 +2778,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -2728,7 +2787,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -2822,7 +2881,7 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
async fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
@@ -2846,7 +2905,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -2898,10 +2957,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut max_deltas = 0;
@@ -2972,7 +3035,7 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
start = img_range.end;
if force || self.time_for_new_image_layer(partition, lsn)? {
if force || self.time_for_new_image_layer(partition, lsn).await? {
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -3044,7 +3107,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
@@ -3111,9 +3174,8 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
@@ -3234,7 +3296,6 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3471,7 +3532,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3730,7 +3791,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4013,7 +4074,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4171,7 +4232,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4273,8 +4334,8 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4346,7 +4407,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// but will cause large code changes.
pub struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {
@@ -4362,12 +4423,23 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
pub fn put_locked(
&self,
key: Key,
lsn: Lsn,
value: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
self.tl
.put_value_locked(key, lsn, value, pre_locked_layer_map)
}
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -185,7 +185,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {

View File

@@ -1310,8 +1310,9 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
let timeline = timeline.initialize(&ctx).await.unwrap();
ConnectionManagerState {
id: TenantTimelineId {

View File

@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -1200,7 +1200,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit()?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1209,7 +1209,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1217,22 +1217,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1318,7 +1318,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1360,7 +1360,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1373,7 +1373,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1398,7 +1398,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1428,14 +1428,14 @@ mod tests {
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1454,7 +1454,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit()?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1472,7 +1472,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1497,7 +1497,7 @@ mod tests {
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1509,7 +1509,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1554,7 +1554,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1603,7 +1603,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
assert_eq!(
tline
@@ -1637,7 +1637,7 @@ mod tests {
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut lsn = 0x10;
@@ -1648,7 +1648,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit()?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1664,7 +1664,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1677,7 +1677,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1693,7 +1693,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber