From a0c82969a2032d3cb17e5b5c328072b5af1912bd Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 25 Sep 2023 18:30:10 +0200 Subject: [PATCH] page cache: per-task-kind access stats (#5339) This PR adds a `task_kind` label to page cache access metrics. These are to validate our hypothesis that the high hit page cache rate we observe in prod is due to internal tasks, not getpage requests from compute. We believe the latter should near-always be a pageserver-page-cache _miss_ because compute has it's own page cache, and hence there is no locality of reference for its accesses to pageserver page cache. Before this PR, we didn't have `RequestContext` propagation to any code below the on-demand downloader. The vast majority of changes in this PR is concerned with adding that propagation. --- pageserver/ctl/src/layer_map_analyzer.rs | 10 +- pageserver/ctl/src/layers.rs | 12 +- pageserver/src/import_datadir.rs | 8 +- pageserver/src/metrics.rs | 75 ++++++---- pageserver/src/page_cache.rs | 23 ++- pageserver/src/pgdatadir_mapping.rs | 8 +- pageserver/src/task_mgr.rs | 1 + pageserver/src/tenant.rs | 94 ++++++++---- pageserver/src/tenant/blob_io.rs | 21 ++- pageserver/src/tenant/block_io.rs | 35 +++-- pageserver/src/tenant/disk_btree.rs | 134 ++++++++++++------ pageserver/src/tenant/ephemeral_file.rs | 75 +++++++--- .../src/tenant/storage_layer/delta_layer.rs | 73 ++++++---- .../src/tenant/storage_layer/image_layer.rs | 30 ++-- .../tenant/storage_layer/inmemory_layer.rs | 22 +-- pageserver/src/tenant/timeline.rs | 39 +++-- pageserver/src/walingest.rs | 40 +++--- 17 files changed, 468 insertions(+), 232 deletions(-) diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 495dae87e3..de7b4861cb 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -3,6 +3,8 @@ //! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data. use anyhow::Result; +use pageserver::context::{DownloadBehavior, RequestContext}; +use pageserver::task_mgr::TaskKind; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use std::cmp::Ordering; use std::collections::BinaryHeap; @@ -96,9 +98,9 @@ pub(crate) fn parse_filename(name: &str) -> Option { } // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" -async fn get_holes(path: &Path, max_holes: usize) -> Result> { +async fn get_holes(path: &Path, max_holes: usize, ctx: &RequestContext) -> Result> { let file = FileBlockReader::new(VirtualFile::open(path).await?); - let summary_blk = file.read_blk(0).await?; + let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( actual_summary.index_start_blk, @@ -125,6 +127,7 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result> { prev_key = Some(curr.next()); true }, + ctx, ) .await?; let mut holes = heap.into_vec(); @@ -135,6 +138,7 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result> { pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let storage_path = &cmd.path; let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES); + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. pageserver::virtual_file::init(10); @@ -163,7 +167,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { parse_filename(&layer.file_name().into_string().unwrap()) { if layer_file.is_delta { - layer_file.holes = get_holes(&layer.path(), max_holes).await?; + layer_file.holes = get_holes(&layer.path(), max_holes, &ctx).await?; n_deltas += 1; } layers.push(layer_file); diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 33a6f197cf..e8d16d31f1 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -2,6 +2,8 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use clap::Subcommand; +use pageserver::context::{DownloadBehavior, RequestContext}; +use pageserver::task_mgr::TaskKind; use pageserver::tenant::block_io::BlockCursor; use pageserver::tenant::disk_btree::DiskBtreeReader; use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; @@ -44,12 +46,12 @@ pub(crate) enum LayerCmd { }, } -async fn read_delta_file(path: impl AsRef) -> Result<()> { +async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = path.as_ref(); virtual_file::init(10); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); - let summary_blk = file.read_blk(0).await?; + let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( actual_summary.index_start_blk, @@ -67,11 +69,12 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { all.push((curr, BlobRef(value_offset))); true }, + ctx, ) .await?; let cursor = BlockCursor::new_fileblockreader(&file); for (k, v) in all { - let value = cursor.read_blob(v.pos()).await?; + let value = cursor.read_blob(v.pos(), ctx).await?; println!("key:{} value_len:{}", k, value.len()); } // TODO(chi): special handling for last key? @@ -79,6 +82,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { } pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); match cmd { LayerCmd::List { path } => { for tenant in fs::read_dir(path.join(TENANTS_SEGMENT_NAME))? { @@ -153,7 +157,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { ); if layer_file.is_delta { - read_delta_file(layer.path()).await?; + read_delta_file(layer.path(), &ctx).await?; } else { anyhow::bail!("not supported yet :("); } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 5bff5337bd..5a1affdb11 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir( { pg_control = Some(control_file); } - modification.flush().await?; + modification.flush(ctx).await?; } } // We're done importing all the data files. - modification.commit().await?; + modification.commit(ctx).await?; // We expect the Postgres server to be shut down cleanly. let pg_control = pg_control.context("pg_control file not found")?; @@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } - modification.flush().await?; + modification.flush(ctx).await?; } tokio_tar::EntryType::Directory => { debug!("directory {:?}", file_path); @@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar( // sanity check: ensure that pg_control is loaded let _pg_control = pg_control.context("pg_control file not found")?; - modification.commit().await?; + modification.commit(ctx).await?; Ok(()) } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 6d1c89c3db..1aa168f3be 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,3 +1,4 @@ +use enum_map::EnumMap; use metrics::metric_vec_duration::DurationResultObserver; use metrics::{ register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec, @@ -127,7 +128,7 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -pub struct PageCacheMetrics { +pub struct PageCacheMetricsForTaskKind { pub read_accesses_materialized_page: IntCounter, pub read_accesses_immutable: IntCounter, @@ -136,11 +137,15 @@ pub struct PageCacheMetrics { pub read_hits_materialized_page_older_lsn: IntCounter, } +pub struct PageCacheMetrics { + by_task_kind: EnumMap, +} + static PAGE_CACHE_READ_HITS: Lazy = Lazy::new(|| { register_int_counter_vec!( "pageserver_page_cache_read_hits_total", "Number of read accesses to the page cache that hit", - &["key_kind", "hit_kind"] + &["task_kind", "key_kind", "hit_kind"] ) .expect("failed to define a metric") }); @@ -149,43 +154,55 @@ static PAGE_CACHE_READ_ACCESSES: Lazy = Lazy::new(|| { register_int_counter_vec!( "pageserver_page_cache_read_accesses_total", "Number of read accesses to the page cache", - &["key_kind"] + &["task_kind", "key_kind"] ) .expect("failed to define a metric") }); pub static PAGE_CACHE: Lazy = Lazy::new(|| PageCacheMetrics { - read_accesses_materialized_page: { - PAGE_CACHE_READ_ACCESSES - .get_metric_with_label_values(&["materialized_page"]) - .unwrap() - }, + by_task_kind: EnumMap::from_array(std::array::from_fn(|task_kind| { + let task_kind = ::from_usize(task_kind); + let task_kind: &'static str = task_kind.into(); + PageCacheMetricsForTaskKind { + read_accesses_materialized_page: { + PAGE_CACHE_READ_ACCESSES + .get_metric_with_label_values(&[task_kind, "materialized_page"]) + .unwrap() + }, - read_accesses_immutable: { - PAGE_CACHE_READ_ACCESSES - .get_metric_with_label_values(&["immutable"]) - .unwrap() - }, + read_accesses_immutable: { + PAGE_CACHE_READ_ACCESSES + .get_metric_with_label_values(&[task_kind, "immutable"]) + .unwrap() + }, - read_hits_immutable: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&["immutable", "-"]) - .unwrap() - }, + read_hits_immutable: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[task_kind, "immutable", "-"]) + .unwrap() + }, - read_hits_materialized_page_exact: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&["materialized_page", "exact"]) - .unwrap() - }, + read_hits_materialized_page_exact: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[task_kind, "materialized_page", "exact"]) + .unwrap() + }, - read_hits_materialized_page_older_lsn: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&["materialized_page", "older_lsn"]) - .unwrap() - }, + read_hits_materialized_page_older_lsn: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[task_kind, "materialized_page", "older_lsn"]) + .unwrap() + }, + } + })), }); +impl PageCacheMetrics { + pub(crate) fn for_task_kind(&self, task_kind: TaskKind) -> &PageCacheMetricsForTaskKind { + &self.by_task_kind[task_kind] + } +} + pub struct PageCacheSizeMetrics { pub max_bytes: UIntGauge, @@ -1266,6 +1283,8 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use crate::task_mgr::TaskKind; + pub struct RemoteTimelineClientMetrics { tenant_id: String, timeline_id: String, diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9cc1bf35dc..3dcdd73d02 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -85,7 +85,7 @@ use utils::{ lsn::Lsn, }; -use crate::{metrics::PageCacheSizeMetrics, repository::Key}; +use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key}; static PAGE_CACHE: OnceCell = OnceCell::new(); const TEST_PAGE_CACHE_SIZE: usize = 50; @@ -346,8 +346,10 @@ impl PageCache { timeline_id: TimelineId, key: &Key, lsn: Lsn, + ctx: &RequestContext, ) -> Option<(Lsn, PageReadGuard)> { crate::metrics::PAGE_CACHE + .for_task_kind(ctx.task_kind()) .read_accesses_materialized_page .inc(); @@ -368,10 +370,12 @@ impl PageCache { { if available_lsn == lsn { crate::metrics::PAGE_CACHE + .for_task_kind(ctx.task_kind()) .read_hits_materialized_page_exact .inc(); } else { crate::metrics::PAGE_CACHE + .for_task_kind(ctx.task_kind()) .read_hits_materialized_page_older_lsn .inc(); } @@ -426,10 +430,11 @@ impl PageCache { &self, file_id: FileId, blkno: u32, + ctx: &RequestContext, ) -> anyhow::Result { let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; - self.lock_for_read(&mut cache_key).await + self.lock_for_read(&mut cache_key, ctx).await } // @@ -497,14 +502,22 @@ impl PageCache { /// } /// ``` /// - async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result { + async fn lock_for_read( + &self, + cache_key: &mut CacheKey, + ctx: &RequestContext, + ) -> anyhow::Result { let (read_access, hit) = match cache_key { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") } CacheKey::ImmutableFilePage { .. } => ( - &crate::metrics::PAGE_CACHE.read_accesses_immutable, - &crate::metrics::PAGE_CACHE.read_hits_immutable, + &crate::metrics::PAGE_CACHE + .for_task_kind(ctx.task_kind()) + .read_accesses_immutable, + &crate::metrics::PAGE_CACHE + .for_task_kind(ctx.task_kind()) + .read_hits_immutable, ), }; read_access.inc(); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 54b41f3e9d..9a1281a522 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1138,7 +1138,7 @@ impl<'a> DatadirModification<'a> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub async fn flush(&mut self) -> anyhow::Result<()> { + pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -1154,7 +1154,7 @@ impl<'a> DatadirModification<'a> { if is_rel_block_key(key) || is_slru_block_key(key) { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. - writer.put(key, self.lsn, &value).await?; + writer.put(key, self.lsn, &value, ctx).await?; } else { retained_pending_updates.insert(key, value); } @@ -1174,14 +1174,14 @@ impl<'a> DatadirModification<'a> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub async fn commit(&mut self) -> anyhow::Result<()> { + pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { let writer = self.tline.writer().await; let lsn = self.lsn; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value).await?; + writer.put(key, lsn, &value, ctx).await?; } for key_range in self.pending_deletions.drain(..) { writer.delete(key_range, lsn).await?; diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 3c7a1115df..650bc119b6 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -187,6 +187,7 @@ task_local! { Debug, // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy enumset::EnumSetType, + enum_map::Enum, serde::Serialize, serde::Deserialize, strum_macros::IntoStaticStr, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f0639844bd..1c92c618fa 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1504,7 +1504,7 @@ impl Tenant { .init_empty_test_timeline() .context("init_empty_test_timeline")?; modification - .commit() + .commit(ctx) .await .context("commit init_empty_test_timeline modification")?; @@ -3538,14 +3538,24 @@ mod tests { let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .put( + *TEST_KEY, + Lsn(0x10), + &Value::Image(TEST_IMG("foo at 0x10")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .put( + *TEST_KEY, + Lsn(0x20), + &Value::Image(TEST_IMG("foo at 0x20")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3619,19 +3629,19 @@ mod tests { // Insert a value on the timeline writer - .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx) .await?; writer - .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx) .await?; writer.finish_write(Lsn(0x20)); writer - .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx) .await?; writer.finish_write(Lsn(0x30)); writer - .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx) .await?; writer.finish_write(Lsn(0x40)); @@ -3646,7 +3656,7 @@ mod tests { .expect("Should have a local timeline"); let new_writer = newtline.writer().await; new_writer - .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx) .await?; new_writer.finish_write(Lsn(0x40)); @@ -3669,7 +3679,11 @@ mod tests { Ok(()) } - async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> { + async fn make_some_layers( + tline: &Timeline, + start_lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result<()> { let mut lsn = start_lsn; #[allow(non_snake_case)] { @@ -3680,6 +3694,7 @@ mod tests { *TEST_KEY, lsn, &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ctx, ) .await?; writer.finish_write(lsn); @@ -3689,6 +3704,7 @@ mod tests { *TEST_KEY, lsn, &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ctx, ) .await?; writer.finish_write(lsn); @@ -3702,6 +3718,7 @@ mod tests { *TEST_KEY, lsn, &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ctx, ) .await?; writer.finish_write(lsn); @@ -3711,6 +3728,7 @@ mod tests { *TEST_KEY, lsn, &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ctx, ) .await?; writer.finish_write(lsn); @@ -3727,7 +3745,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 // FIXME: this doesn't actually remove any layer currently, given how the flushing @@ -3801,7 +3819,7 @@ mod tests { .load(); let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?; let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); @@ -3823,7 +3841,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) @@ -3832,7 +3850,7 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; + make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?; tline.set_broken("test".to_owned()); @@ -3873,7 +3891,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) @@ -3898,7 +3916,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) @@ -3907,7 +3925,7 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; + make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?; // run gc on parent tenant @@ -3932,7 +3950,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x8000)).await?; + make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?; // so that all uploads finish & we can call harness.load() below again tenant .shutdown(Default::default(), true) @@ -3961,7 +3979,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; let child_tline = tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) @@ -3972,7 +3990,7 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; + make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?; // so that all uploads finish & we can call harness.load() below again tenant @@ -4004,7 +4022,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; - make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?; let layer_map = tline.layers.read().await; let level0_deltas = layer_map.layer_map().get_level0_deltas()?; @@ -4087,7 +4105,12 @@ mod tests { let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .put( + *TEST_KEY, + Lsn(0x10), + &Value::Image(TEST_IMG("foo at 0x10")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -4097,7 +4120,12 @@ mod tests { let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .put( + *TEST_KEY, + Lsn(0x20), + &Value::Image(TEST_IMG("foo at 0x20")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -4107,7 +4135,12 @@ mod tests { let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .put( + *TEST_KEY, + Lsn(0x30), + &Value::Image(TEST_IMG("foo at 0x30")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -4117,7 +4150,12 @@ mod tests { let writer = tline.writer().await; writer - .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .put( + *TEST_KEY, + Lsn(0x40), + &Value::Image(TEST_IMG("foo at 0x40")), + &ctx, + ) .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -4175,6 +4213,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + &ctx, ) .await?; writer.finish_write(lsn); @@ -4227,6 +4266,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + &ctx, ) .await?; writer.finish_write(lsn); @@ -4247,6 +4287,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + &ctx, ) .await?; writer.finish_write(lsn); @@ -4306,6 +4347,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + &ctx, ) .await?; writer.finish_write(lsn); @@ -4334,6 +4376,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + &ctx, ) .await?; println!("updating {} at {}", blknum, lsn); @@ -4402,6 +4445,7 @@ mod tests { test_key, lsn, &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + &ctx, ) .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); @@ -4474,7 +4518,7 @@ mod tests { .init_empty_test_timeline() .context("init_empty_test_timeline")?; modification - .commit() + .commit(&ctx) .await .context("commit init_empty_test_timeline modification")?; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 4fad1f3c14..21327deb70 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,6 +11,7 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! +use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; @@ -19,9 +20,13 @@ use std::io::{Error, ErrorKind}; impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. - pub async fn read_blob(&self, offset: u64) -> Result, std::io::Error> { + pub async fn read_blob( + &self, + offset: u64, + ctx: &RequestContext, + ) -> Result, std::io::Error> { let mut buf = Vec::new(); - self.read_blob_into_buf(offset, &mut buf).await?; + self.read_blob_into_buf(offset, &mut buf, ctx).await?; Ok(buf) } /// Read blob into the given buffer. Any previous contents in the buffer @@ -30,11 +35,12 @@ impl<'a> BlockCursor<'a> { &self, offset: u64, dstbuf: &mut Vec, + ctx: &RequestContext, ) -> Result<(), std::io::Error> { let mut blknum = (offset / PAGE_SZ as u64) as u32; let mut off = (offset % PAGE_SZ as u64) as usize; - let mut buf = self.read_blk(blknum).await?; + let mut buf = self.read_blk(blknum, ctx).await?; // peek at the first byte, to determine if it's a 1- or 4-byte length let first_len_byte = buf[off]; @@ -50,7 +56,7 @@ impl<'a> BlockCursor<'a> { // it is split across two pages len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]); blknum += 1; - buf = self.read_blk(blknum).await?; + buf = self.read_blk(blknum, ctx).await?; len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]); off = 4 - thislen; } else { @@ -71,7 +77,7 @@ impl<'a> BlockCursor<'a> { if page_remain == 0 { // continue on next page blknum += 1; - buf = self.read_blk(blknum).await?; + buf = self.read_blk(blknum, ctx).await?; off = 0; page_remain = PAGE_SZ; } @@ -228,12 +234,13 @@ impl BlobWriter { #[cfg(test)] mod tests { use super::*; - use crate::tenant::block_io::BlockReaderRef; + use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef}; use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { let temp_dir = tempfile::tempdir()?; let path = temp_dir.path().join("file"); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); // Write part (in block to drop the file) let mut offsets = Vec::new(); @@ -255,7 +262,7 @@ mod tests { let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new(rdr); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { - let blob_read = rdr.read_blob(*offset).await?; + let blob_read = rdr.read_blob(*offset, &ctx).await?; assert_eq!( blob, &blob_read, "mismatch for idx={idx} at offset={offset}" diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index ab5b5e9fca..d81cf1b8a0 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,6 +4,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; +use crate::context::RequestContext; use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; @@ -82,12 +83,16 @@ pub(crate) enum BlockReaderRef<'a> { impl<'a> BlockReaderRef<'a> { #[inline(always)] - async fn read_blk(&self, blknum: u32) -> Result { + async fn read_blk( + &self, + blknum: u32, + ctx: &RequestContext, + ) -> Result { use BlockReaderRef::*; match self { - FileBlockReader(r) => r.read_blk(blknum).await, - EphemeralFile(r) => r.read_blk(blknum).await, - Adapter(r) => r.read_blk(blknum).await, + FileBlockReader(r) => r.read_blk(blknum, ctx).await, + EphemeralFile(r) => r.read_blk(blknum, ctx).await, + Adapter(r) => r.read_blk(blknum, ctx).await, #[cfg(test)] TestDisk(r) => r.read_blk(blknum), #[cfg(test)] @@ -105,11 +110,13 @@ impl<'a> BlockReaderRef<'a> { /// /// ```no_run /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader}; +/// # use pageserver::context::RequestContext; /// # let reader: FileBlockReader = unimplemented!("stub"); +/// # let ctx: RequestContext = unimplemented!("stub"); /// let cursor = reader.block_cursor(); -/// let buf = cursor.read_blk(1); +/// let buf = cursor.read_blk(1, &ctx); /// // do stuff with 'buf' -/// let buf = cursor.read_blk(2); +/// let buf = cursor.read_blk(2, &ctx); /// // do stuff with 'buf' /// ``` /// @@ -134,8 +141,12 @@ impl<'a> BlockCursor<'a> { /// access to the contents of the page. (For the page cache, the /// lease object represents a lock on the buffer.) #[inline(always)] - pub async fn read_blk(&self, blknum: u32) -> Result { - self.reader.read_blk(blknum).await + pub async fn read_blk( + &self, + blknum: u32, + ctx: &RequestContext, + ) -> Result { + self.reader.read_blk(blknum, ctx).await } } @@ -169,11 +180,15 @@ impl FileBlockReader { /// Returns a "lease" object that can be used to /// access to the contents of the page. (For the page cache, the /// lease object represents a lock on the buffer.) - pub async fn read_blk(&self, blknum: u32) -> Result { + pub async fn read_blk( + &self, + blknum: u32, + ctx: &RequestContext, + ) -> Result { let cache = page_cache::get(); loop { match cache - .read_immutable_buf(self.file_id, blknum) + .read_immutable_buf(self.file_id, blknum, ctx) .await .map_err(|e| { std::io::Error::new( diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 44d6b4f87e..06a04bf536 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -26,7 +26,11 @@ use std::{cmp::Ordering, io, result}; use thiserror::Error; use tracing::error; -use crate::tenant::block_io::{BlockReader, BlockWriter}; +use crate::{ + context::{DownloadBehavior, RequestContext}, + task_mgr::TaskKind, + tenant::block_io::{BlockReader, BlockWriter}, +}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. pub const VALUE_SZ: usize = 5; @@ -231,14 +235,19 @@ where /// /// Read the value for given key. Returns the value, or None if it doesn't exist. /// - pub async fn get(&self, search_key: &[u8; L]) -> Result> { + pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result> { let mut result: Option = None; - self.visit(search_key, VisitDirection::Forwards, |key, value| { - if key == search_key { - result = Some(value); - } - false - }) + self.visit( + search_key, + VisitDirection::Forwards, + |key, value| { + if key == search_key { + result = Some(value); + } + false + }, + ctx, + ) .await?; Ok(result) } @@ -253,6 +262,7 @@ where search_key: &[u8; L], dir: VisitDirection, mut visitor: V, + ctx: &RequestContext, ) -> Result where V: FnMut(&[u8], u64) -> bool, @@ -262,7 +272,9 @@ where let block_cursor = self.reader.block_cursor(); while let Some((node_blknum, opt_iter)) = stack.pop() { // Locate the node. - let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?; + let node_buf = block_cursor + .read_blk(self.start_blk + node_blknum, ctx) + .await?; let node = OnDiskNode::deparse(node_buf.as_ref())?; let prefix_len = node.prefix_len as usize; @@ -351,13 +363,14 @@ where #[allow(dead_code)] pub async fn dump(&self) -> Result<()> { let mut stack = Vec::new(); + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); stack.push((self.root_blk, String::new(), 0, 0, 0)); let block_cursor = self.reader.block_cursor(); while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() { - let blk = block_cursor.read_blk(self.start_blk + blknum).await?; + let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?; let buf: &[u8] = blk.as_ref(); let node = OnDiskNode::::deparse(buf)?; @@ -688,6 +701,8 @@ impl BuildNode { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::context::DownloadBehavior; + use crate::task_mgr::TaskKind; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef}; use rand::Rng; use std::collections::BTreeMap; @@ -725,6 +740,8 @@ pub(crate) mod tests { let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let all_keys: Vec<&[u8; 6]> = vec![ b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb", ]; @@ -745,12 +762,12 @@ pub(crate) mod tests { // Test the `get` function on all the keys. for (key, val) in all_data.iter() { - assert_eq!(reader.get(key).await?, Some(*val)); + assert_eq!(reader.get(key, &ctx).await?, Some(*val)); } // And on some keys that don't exist - assert_eq!(reader.get(b"aaaaaa").await?, None); - assert_eq!(reader.get(b"zzzzzz").await?, None); - assert_eq!(reader.get(b"xaaabx").await?, None); + assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None); + assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None); + assert_eq!(reader.get(b"xaaabx", &ctx).await?, None); // Test search with `visit` function let search_key = b"xabaaa"; @@ -762,10 +779,15 @@ pub(crate) mod tests { let mut data = Vec::new(); reader - .visit(search_key, VisitDirection::Forwards, |key, value| { - data.push((key.to_vec(), value)); - true - }) + .visit( + search_key, + VisitDirection::Forwards, + |key, value| { + data.push((key.to_vec(), value)); + true + }, + &ctx, + ) .await?; assert_eq!(data, expected); @@ -778,18 +800,28 @@ pub(crate) mod tests { expected.reverse(); let mut data = Vec::new(); reader - .visit(search_key, VisitDirection::Backwards, |key, value| { - data.push((key.to_vec(), value)); - true - }) + .visit( + search_key, + VisitDirection::Backwards, + |key, value| { + data.push((key.to_vec(), value)); + true + }, + &ctx, + ) .await?; assert_eq!(data, expected); // Backward scan where nothing matches reader - .visit(b"aaaaaa", VisitDirection::Backwards, |key, value| { - panic!("found unexpected key {}: {}", hex::encode(key), value); - }) + .visit( + b"aaaaaa", + VisitDirection::Backwards, + |key, value| { + panic!("found unexpected key {}: {}", hex::encode(key), value); + }, + &ctx, + ) .await?; // Full scan @@ -799,10 +831,15 @@ pub(crate) mod tests { .collect(); let mut data = Vec::new(); reader - .visit(&[0u8; 6], VisitDirection::Forwards, |key, value| { - data.push((key.to_vec(), value)); - true - }) + .visit( + &[0u8; 6], + VisitDirection::Forwards, + |key, value| { + data.push((key.to_vec(), value)); + true + }, + &ctx, + ) .await?; assert_eq!(data, expected); @@ -813,6 +850,7 @@ pub(crate) mod tests { async fn lots_of_keys() -> Result<()> { let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); const NUM_KEYS: u64 = 1000; @@ -851,14 +889,14 @@ pub(crate) mod tests { for search_key_int in 0..(NUM_KEYS * 2 + 10) { let search_key = u64::to_be_bytes(search_key_int); assert_eq!( - reader.get(&search_key).await?, + reader.get(&search_key, &ctx).await?, all_data.get(&search_key_int).cloned() ); // Test a forward scan starting with this key result.lock().unwrap().clear(); reader - .visit(&search_key, VisitDirection::Forwards, take_ten) + .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx) .await?; let expected = all_data .range(search_key_int..) @@ -870,7 +908,7 @@ pub(crate) mod tests { // And a backwards scan result.lock().unwrap().clear(); reader - .visit(&search_key, VisitDirection::Backwards, take_ten) + .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx) .await?; let expected = all_data .range(..=search_key_int) @@ -886,7 +924,7 @@ pub(crate) mod tests { limit.store(usize::MAX, Ordering::Relaxed); result.lock().unwrap().clear(); reader - .visit(&search_key, VisitDirection::Forwards, take_ten) + .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx) .await?; let expected = all_data .iter() @@ -899,7 +937,7 @@ pub(crate) mod tests { limit.store(usize::MAX, Ordering::Relaxed); result.lock().unwrap().clear(); reader - .visit(&search_key, VisitDirection::Backwards, take_ten) + .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx) .await?; let expected = all_data .iter() @@ -913,6 +951,8 @@ pub(crate) mod tests { #[tokio::test] async fn random_data() -> Result<()> { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + // Generate random keys with exponential distribution, to // exercise the prefix compression const NUM_KEYS: usize = 100000; @@ -939,22 +979,24 @@ pub(crate) mod tests { // Test get() operation on all the keys for (&key, &val) in all_data.iter() { let search_key = u128::to_be_bytes(key); - assert_eq!(reader.get(&search_key).await?, Some(val)); + assert_eq!(reader.get(&search_key, &ctx).await?, Some(val)); } // Test get() operations on random keys, most of which will not exist for _ in 0..100000 { let key_int = rand::thread_rng().gen::(); let search_key = u128::to_be_bytes(key_int); - assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned()); + assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned()); } // Test boundary cases assert!( - reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned() + reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await? + == all_data.get(&u128::MIN).cloned() ); assert!( - reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned() + reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await? + == all_data.get(&u128::MAX).cloned() ); Ok(()) @@ -985,6 +1027,7 @@ pub(crate) mod tests { // Build a tree from it let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); for (key, val) in disk_btree_test_data::TEST_DATA { writer.append(&key, val)?; @@ -997,16 +1040,21 @@ pub(crate) mod tests { // Test get() operation on all the keys for (key, val) in disk_btree_test_data::TEST_DATA { - assert_eq!(reader.get(&key).await?, Some(val)); + assert_eq!(reader.get(&key, &ctx).await?, Some(val)); } // Test full scan let mut count = 0; reader - .visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| { - count += 1; - true - }) + .visit( + &[0u8; 26], + VisitDirection::Forwards, + |_key, _value| { + count += 1; + true + }, + &ctx, + ) .await?; assert_eq!(count, disk_btree_test_data::TEST_DATA.len()); diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 887834cd9b..8785f51c06 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,6 +2,7 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; +use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; @@ -61,13 +62,17 @@ impl EphemeralFile { self.len } - pub(crate) async fn read_blk(&self, blknum: u32) -> Result { + pub(crate) async fn read_blk( + &self, + blknum: u32, + ctx: &RequestContext, + ) -> Result { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { let cache = page_cache::get(); loop { match cache - .read_immutable_buf(self.page_cache_file_id, blknum) + .read_immutable_buf(self.page_cache_file_id, blknum, ctx) .await .map_err(|e| { std::io::Error::new( @@ -103,7 +108,11 @@ impl EphemeralFile { } } - pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + pub(crate) async fn write_blob( + &mut self, + srcbuf: &[u8], + ctx: &RequestContext, + ) -> Result { struct Writer<'a> { ephemeral_file: &'a mut EphemeralFile, /// The block to which the next [`push_bytes`] will write. @@ -120,7 +129,11 @@ impl EphemeralFile { }) } #[inline(always)] - async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> { + async fn push_bytes( + &mut self, + src: &[u8], + ctx: &RequestContext, + ) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; @@ -146,6 +159,7 @@ impl EphemeralFile { .read_immutable_buf( self.ephemeral_file.page_cache_file_id, self.blknum, + ctx, ) .await { @@ -199,15 +213,15 @@ impl EphemeralFile { if srcbuf.len() < 0x80 { // short one-byte length header let len_buf = [srcbuf.len() as u8]; - writer.push_bytes(&len_buf).await?; + writer.push_bytes(&len_buf, ctx).await?; } else { let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32); len_buf[0] |= 0x80; - writer.push_bytes(&len_buf).await?; + writer.push_bytes(&len_buf, ctx).await?; } // Write the payload - writer.push_bytes(srcbuf).await?; + writer.push_bytes(srcbuf, ctx).await?; if srcbuf.len() < 0x80 { self.len += 1; @@ -261,6 +275,8 @@ impl BlockReader for EphemeralFile { #[cfg(test)] mod tests { use super::*; + use crate::context::DownloadBehavior; + use crate::task_mgr::TaskKind; use crate::tenant::block_io::{BlockCursor, BlockReaderRef}; use rand::{thread_rng, RngCore}; use std::fs; @@ -268,7 +284,15 @@ mod tests { fn harness( test_name: &str, - ) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> { + ) -> Result< + ( + &'static PageServerConf, + TenantId, + TimelineId, + RequestContext, + ), + io::Error, + > { let repo_dir = PageServerConf::test_repo_dir(test_name); let _ = fs::remove_dir_all(&repo_dir); let conf = PageServerConf::dummy_conf(repo_dir); @@ -280,46 +304,57 @@ mod tests { let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap(); fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?; - Ok((conf, tenant_id, timeline_id)) + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + Ok((conf, tenant_id, timeline_id, ctx)) } #[tokio::test] async fn test_ephemeral_blobs() -> Result<(), io::Error> { - let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; + let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?; let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; - let pos_foo = file.write_blob(b"foo").await?; + let pos_foo = file.write_blob(b"foo", &ctx).await?; assert_eq!( b"foo", - file.block_cursor().read_blob(pos_foo).await?.as_slice() + file.block_cursor() + .read_blob(pos_foo, &ctx) + .await? + .as_slice() ); - let pos_bar = file.write_blob(b"bar").await?; + let pos_bar = file.write_blob(b"bar", &ctx).await?; assert_eq!( b"foo", - file.block_cursor().read_blob(pos_foo).await?.as_slice() + file.block_cursor() + .read_blob(pos_foo, &ctx) + .await? + .as_slice() ); assert_eq!( b"bar", - file.block_cursor().read_blob(pos_bar).await?.as_slice() + file.block_cursor() + .read_blob(pos_bar, &ctx) + .await? + .as_slice() ); let mut blobs = Vec::new(); for i in 0..10000 { let data = Vec::from(format!("blob{}", i).as_bytes()); - let pos = file.write_blob(&data).await?; + let pos = file.write_blob(&data, &ctx).await?; blobs.push((pos, data)); } // also test with a large blobs for i in 0..100 { let data = format!("blob{}", i).as_bytes().repeat(100); - let pos = file.write_blob(&data).await?; + let pos = file.write_blob(&data, &ctx).await?; blobs.push((pos, data)); } let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file)); for (pos, expected) in blobs { - let actual = cursor.read_blob(pos).await?; + let actual = cursor.read_blob(pos, &ctx).await?; assert_eq!(actual, expected); } @@ -327,8 +362,8 @@ mod tests { let mut large_data = Vec::new(); large_data.resize(20000, 0); thread_rng().fill_bytes(&mut large_data); - let pos_large = file.write_blob(&large_data).await?; - let result = file.block_cursor().read_blob(pos_large).await?; + let pos_large = file.write_blob(&large_data, &ctx).await?; + let result = file.block_cursor().read_blob(pos_large, &ctx).await?; assert_eq!(result, large_data); Ok(()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6925cb59cd..9e5b6bd55f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -317,11 +317,11 @@ impl DeltaLayer { tree_reader.dump().await?; - let keys = DeltaLayerInner::load_keys(&inner).await?; + let keys = DeltaLayerInner::load_keys(&inner, ctx).await?; // A subroutine to dump a single blob - async fn dump_blob(val: ValueRef<'_>) -> Result { - let buf = val.reader.read_blob(val.blob_ref.pos()).await?; + async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> Result { + let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?; let val = Value::des(&buf)?; let desc = match val { Value::Image(img) => { @@ -342,7 +342,7 @@ impl DeltaLayer { for entry in keys { let DeltaEntry { key, lsn, val, .. } = entry; - let desc = match dump_blob(val).await { + let desc = match dump_blob(val, ctx).await { Ok(desc) => desc, Err(err) => { let err: anyhow::Error = err; @@ -370,7 +370,7 @@ impl DeltaLayer { .load(LayerAccessKind::GetValueReconstructData, ctx) .await?; inner - .get_value_reconstruct_data(key, lsn_range, reconstruct_state) + .get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx) .await } @@ -453,12 +453,12 @@ impl DeltaLayer { self.access_stats.record_access(access_kind, ctx); // Quick exit if already loaded self.inner - .get_or_try_init(|| self.load_inner()) + .get_or_try_init(|| self.load_inner(ctx)) .await .with_context(|| format!("Failed to load delta layer {}", self.path().display())) } - async fn load_inner(&self) -> Result> { + async fn load_inner(&self, ctx: &RequestContext) -> Result> { let path = self.path(); let summary = match &self.path_or_conf { @@ -466,7 +466,7 @@ impl DeltaLayer { PathOrConf::Path(_) => None, }; - let loaded = DeltaLayerInner::load(&path, summary).await?; + let loaded = DeltaLayerInner::load(&path, summary, ctx).await?; if let PathOrConf::Path(ref path) = self.path_or_conf { // not production code @@ -554,7 +554,7 @@ impl DeltaLayer { .load(LayerAccessKind::KeyIter, ctx) .await .context("load delta layer keys")?; - DeltaLayerInner::load_keys(inner) + DeltaLayerInner::load_keys(inner, ctx) .await .context("Layer index is corrupted") } @@ -849,13 +849,14 @@ impl DeltaLayerInner { pub(super) async fn load( path: &std::path::Path, summary: Option, + ctx: &RequestContext, ) -> anyhow::Result { let file = VirtualFile::open(path) .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); - let summary_blk = file.read_blk(0).await?; + let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; if let Some(mut expected_summary) = summary { @@ -883,6 +884,7 @@ impl DeltaLayerInner { key: Key, lsn_range: Range, reconstruct_state: &mut ValueReconstructState, + ctx: &RequestContext, ) -> anyhow::Result { let mut need_image = true; // Scan the page versions backwards, starting from `lsn`. @@ -897,19 +899,24 @@ impl DeltaLayerInner { let mut offsets: Vec<(Lsn, u64)> = Vec::new(); tree_reader - .visit(&search_key.0, VisitDirection::Backwards, |key, value| { - let blob_ref = BlobRef(value); - if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { - return false; - } - let entry_lsn = DeltaKey::extract_lsn_from_buf(key); - if entry_lsn < lsn_range.start { - return false; - } - offsets.push((entry_lsn, blob_ref.pos())); + .visit( + &search_key.0, + VisitDirection::Backwards, + |key, value| { + let blob_ref = BlobRef(value); + if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { + return false; + } + let entry_lsn = DeltaKey::extract_lsn_from_buf(key); + if entry_lsn < lsn_range.start { + return false; + } + offsets.push((entry_lsn, blob_ref.pos())); - !blob_ref.will_init() - }) + !blob_ref.will_init() + }, + ctx, + ) .await?; // Ok, 'offsets' now contains the offsets of all the entries we need to read @@ -917,7 +924,7 @@ impl DeltaLayerInner { let mut buf = Vec::new(); for (entry_lsn, pos) in offsets { cursor - .read_blob_into_buf(pos, &mut buf) + .read_blob_into_buf(pos, &mut buf, ctx) .await .with_context(|| { format!( @@ -958,9 +965,10 @@ impl DeltaLayerInner { } } - pub(super) async fn load_keys + Clone>( - this: &T, - ) -> Result>> { + pub(super) async fn load_keys<'a, 'b, T: AsRef + Clone>( + this: &'a T, + ctx: &'b RequestContext, + ) -> Result>> { let dl = this.as_ref(); let file = &dl.file; @@ -997,6 +1005,7 @@ impl DeltaLayerInner { all_keys.push(entry); true }, + ctx, ) .await?; if let Some(last) = all_keys.last_mut() { @@ -1026,9 +1035,9 @@ pub struct ValueRef<'a> { impl<'a> ValueRef<'a> { /// Loads the value from disk - pub async fn load(&self) -> Result { + pub async fn load(&self, ctx: &RequestContext) -> Result { // theoretically we *could* record an access time for each, but it does not really matter - let buf = self.reader.read_blob(self.blob_ref.pos()).await?; + let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?; let val = Value::des(&buf)?; Ok(val) } @@ -1037,7 +1046,11 @@ impl<'a> ValueRef<'a> { pub(crate) struct Adapter(T); impl> Adapter { - pub(crate) async fn read_blk(&self, blknum: u32) -> Result { - self.0.as_ref().file.read_blk(blknum).await + pub(crate) async fn read_blk( + &self, + blknum: u32, + ctx: &RequestContext, + ) -> Result { + self.0.as_ref().file.read_blk(blknum, ctx).await } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2a6cabcc97..f2c73ccf81 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -237,10 +237,15 @@ impl ImageLayer { tree_reader.dump().await?; tree_reader - .visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| { - println!("key: {} offset {}", hex::encode(key), value); - true - }) + .visit( + &[0u8; KEY_SIZE], + VisitDirection::Forwards, + |key, value| { + println!("key: {} offset {}", hex::encode(key), value); + true + }, + ctx, + ) .await?; Ok(()) @@ -261,7 +266,7 @@ impl ImageLayer { .load(LayerAccessKind::GetValueReconstructData, ctx) .await?; inner - .get_value_reconstruct_data(key, reconstruct_state) + .get_value_reconstruct_data(key, reconstruct_state, ctx) .await // FIXME: makes no sense to dump paths .with_context(|| format!("read {}", self.path().display())) @@ -335,12 +340,12 @@ impl ImageLayer { ) -> Result<&ImageLayerInner> { self.access_stats.record_access(access_kind, ctx); self.inner - .get_or_try_init(|| self.load_inner()) + .get_or_try_init(|| self.load_inner(ctx)) .await .with_context(|| format!("Failed to load image layer {}", self.path().display())) } - async fn load_inner(&self) -> Result { + async fn load_inner(&self, ctx: &RequestContext) -> Result { let path = self.path(); let expected_summary = match &self.path_or_conf { @@ -349,7 +354,8 @@ impl ImageLayer { }; let loaded = - ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?; + ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary, ctx) + .await?; if let PathOrConf::Path(ref path) = self.path_or_conf { // not production code @@ -436,12 +442,13 @@ impl ImageLayerInner { path: &std::path::Path, lsn: Lsn, summary: Option, + ctx: &RequestContext, ) -> anyhow::Result { let file = VirtualFile::open(path) .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); - let summary_blk = file.read_blk(0).await?; + let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; if let Some(mut expected_summary) = summary { @@ -470,16 +477,17 @@ impl ImageLayerInner { &self, key: Key, reconstruct_state: &mut ValueReconstructState, + ctx: &RequestContext, ) -> anyhow::Result { let file = &self.file; let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file); let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - if let Some(offset) = tree_reader.get(&keybuf).await? { + if let Some(offset) = tree_reader.get(&keybuf, ctx).await? { let blob = file .block_cursor() - .read_blob(offset) + .read_blob(offset, ctx) .await .with_context(|| format!("failed to read value from offset {}", offset))?; let value = Bytes::from(blob); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 3ff1c6bb18..62da5a715c 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -106,7 +106,7 @@ impl InMemoryLayer { /// debugging function to print out the contents of the layer /// /// this is likely completly unused - pub async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> { + pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { let inner = self.inner.read().await; let end_str = self.end_lsn_or_max(); @@ -125,7 +125,7 @@ impl InMemoryLayer { for (key, vec_map) in inner.index.iter() { for (lsn, pos) in vec_map.as_slice() { let mut desc = String::new(); - cursor.read_blob_into_buf(*pos, &mut buf).await?; + cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; let val = Value::des(&buf); match val { Ok(Value::Image(img)) => { @@ -158,7 +158,7 @@ impl InMemoryLayer { key: Key, lsn_range: Range, reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> anyhow::Result { ensure!(lsn_range.start >= self.start_lsn); let mut need_image = true; @@ -171,7 +171,7 @@ impl InMemoryLayer { if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); for (entry_lsn, pos) in slice.iter().rev() { - let buf = reader.read_blob(*pos).await?; + let buf = reader.read_blob(*pos, ctx).await?; let value = Value::des(&buf)?; match value { Value::Image(img) => { @@ -263,7 +263,13 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { + pub async fn put_value( + &self, + key: Key, + lsn: Lsn, + val: &Value, + ctx: &RequestContext, + ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let inner: &mut _ = &mut *self.inner.write().await; self.assert_writable(); @@ -275,7 +281,7 @@ impl InMemoryLayer { let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); buf.clear(); val.ser_into(&mut buf)?; - inner.file.write_blob(&buf).await? + inner.file.write_blob(&buf, ctx).await? }; let vec_map = inner.index.entry(key).or_default(); @@ -313,7 +319,7 @@ impl InMemoryLayer { /// Write this frozen in-memory layer to disk. /// /// Returns a new delta layer with all the same data as this in-memory layer - pub(crate) async fn write_to_disk(&self) -> Result { + pub(crate) async fn write_to_disk(&self, ctx: &RequestContext) -> Result { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception @@ -347,7 +353,7 @@ impl InMemoryLayer { let key = **key; // Write all page versions for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf).await?; + cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; let will_init = Value::des(&buf)?.will_init(); delta_layer_writer .put_value_bytes(key, *lsn, &buf, will_init) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fa3b487589..78ac1338db 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -471,7 +471,7 @@ impl Timeline { // The cached image can be returned directly if there is no WAL between the cached image // and requested LSN. The cached image can also be used to reduce the amount of WAL needed // for redo. - let cached_page_img = match self.lookup_cached_page(&key, lsn).await { + let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await { Some((cached_lsn, cached_img)) => { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check @@ -2518,13 +2518,18 @@ impl Timeline { } } - async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> { + async fn lookup_cached_page( + &self, + key: &Key, + lsn: Lsn, + ctx: &RequestContext, + ) -> Option<(Lsn, Bytes)> { let cache = page_cache::get(); // FIXME: It's pointless to check the cache for things that are not 8kB pages. // We should look at the key to determine if it's a cacheable object let (lsn, read_guard) = cache - .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn) + .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn, ctx) .await?; let img = Bytes::from(read_guard.to_vec()); Some((lsn, img)) @@ -2558,10 +2563,16 @@ impl Timeline { Ok(layer) } - async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value( + &self, + key: Key, + lsn: Lsn, + val: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); let layer = self.get_layer_for_write(lsn).await?; - layer.put_value(key, lsn, val).await?; + layer.put_value(key, lsn, val, ctx).await?; Ok(()) } @@ -2733,7 +2744,7 @@ impl Timeline { // Normal case, write out a L0 delta layer file. // `create_delta_layer` will not modify the layer map. // We will remove frozen layer and add delta layer in one atomic operation later. - let layer = self.create_delta_layer(&frozen_layer).await?; + let layer = self.create_delta_layer(&frozen_layer, ctx).await?; ( HashMap::from([( layer.filename(), @@ -2856,19 +2867,21 @@ impl Timeline { async fn create_delta_layer( self: &Arc, frozen_layer: &Arc, + ctx: &RequestContext, ) -> anyhow::Result { let span = tracing::info_span!("blocking"); let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); + let ctx = ctx.attached_child(); move || { // Write it out // Keep this inside `spawn_blocking` and `Handle::current` // as long as the write path is still sync and the read impl // is still not fully async. Otherwise executor threads would // be blocked. - let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?; + let new_delta = Handle::current().block_on(frozen_layer.write_to_disk(&ctx))?; let new_delta_path = new_delta.path(); // Sync it to disk. @@ -3574,7 +3587,7 @@ impl Timeline { key, lsn, ref val, .. } in all_values_iter { - let value = val.load().await?; + let value = val.load(ctx).await?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -4699,8 +4712,14 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value).await + pub async fn put( + &self, + key: Key, + lsn: Lsn, + value: &Value, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value, ctx).await } pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e293fcc81b..d290715938 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -363,7 +363,7 @@ impl<'a> WalIngest<'a> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit().await?; + modification.commit(ctx).await?; Ok(()) } @@ -1561,7 +1561,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file - m.commit().await?; + m.commit(ctx).await?; let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?; Ok(walingest) @@ -1580,22 +1580,22 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1681,7 +1681,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation @@ -1723,7 +1723,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx) @@ -1736,7 +1736,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx) @@ -1761,7 +1761,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx) @@ -1800,7 +1800,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; // Check that rel exists and size is correct assert_eq!( @@ -1819,7 +1819,7 @@ mod tests { // Drop rel let mut m = tline.begin_modification(Lsn(0x30)); walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?; - m.commit().await?; + m.commit(&ctx).await?; // Check that rel is not visible anymore assert_eq!( @@ -1837,7 +1837,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; // Check that rel exists and size is correct assert_eq!( @@ -1876,7 +1876,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit().await?; + m.commit(&ctx).await?; // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( @@ -1921,7 +1921,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; // Check reported size and contents after truncation assert_eq!( @@ -1970,7 +1970,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline @@ -2017,7 +2017,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; } assert_current_logical_size(&tline, Lsn(lsn)); @@ -2033,7 +2033,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE @@ -2046,7 +2046,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE - 1 @@ -2062,7 +2062,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx) .await?; - m.commit().await?; + m.commit(&ctx).await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, size as BlockNumber