diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index b4bb239f44..b3294b9a18 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -99,7 +99,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { } // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" -async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result> { +async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &mut RequestContext) -> Result> { let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); let block_reader = FileBlockReader::new(&file, file_id); diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 3611b0baab..7be9aeabad 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -57,7 +57,7 @@ pub(crate) enum LayerCmd { }, } -async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { +async fn read_delta_file(path: impl AsRef, ctx: &mut RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); page_cache::init(100); diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 0f057a4368..2580bf2b10 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -60,7 +60,7 @@ pub async fn send_basebackup_tarball<'a, W>( req_lsn: Option, prev_lsn: Option, full_backup: bool, - ctx: &'a RequestContext, + ctx: &'a mut RequestContext, ) -> Result<(), BasebackupError> where W: AsyncWrite + Send + Sync + Unpin, @@ -141,7 +141,7 @@ where lsn: Lsn, prev_record_lsn: Lsn, full_backup: bool, - ctx: &'a RequestContext, + ctx: &'a mut RequestContext, } /// A sink that accepts SLRU blocks ordered by key and forwards diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 18c1a6cd9b..5787a41198 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -51,7 +51,7 @@ pub async fn collect_metrics( node_id: NodeId, local_disk_storage: Utf8PathBuf, cancel: CancellationToken, - ctx: RequestContext, + mut ctx: RequestContext, ) -> anyhow::Result<()> { if _cached_metric_collection_interval != Duration::ZERO { tracing::warn!( @@ -60,7 +60,7 @@ pub async fn collect_metrics( } // spin up background worker that caclulates tenant sizes - let worker_ctx = + let mut worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -76,7 +76,7 @@ pub async fn collect_metrics( tenant_manager, synthetic_size_calculation_interval, &cancel, - &worker_ctx, + &mut worker_ctx, ) .instrument(info_span!("synthetic_size_worker")) .await?; @@ -122,7 +122,7 @@ pub async fn collect_metrics( let started_at = Instant::now(); // these are point in time, with variable "now" - let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await; + let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &mut ctx).await; let metrics = Arc::new(metrics); @@ -280,7 +280,7 @@ async fn calculate_synthetic_size_worker( tenant_manager: Arc, synthetic_size_calculation_interval: Duration, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { info!("starting calculate_synthetic_size_worker"); scopeguard::defer! { @@ -340,7 +340,7 @@ async fn calculate_synthetic_size_worker( } } -async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) { +async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &mut RequestContext) { const CAUSE: LogicalSizeCalculationCause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 7ba2d04c4f..5d3f073b80 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -184,7 +184,7 @@ impl MetricsKey { pub(super) async fn collect_all_metrics( tenant_manager: &Arc, cached_metrics: &Cache, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Vec { use pageserver_api::models::TenantState; @@ -220,7 +220,7 @@ pub(super) async fn collect_all_metrics( res } -async fn collect(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec +async fn collect(tenants: S, cache: &Cache, ctx: &mut RequestContext) -> Vec where S: futures::stream::Stream)>, { @@ -342,7 +342,7 @@ impl TimelineSnapshot { /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size fn collect( t: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { if !t.is_active() { // no collection for broken or stopping needed, we will still keep the cached values diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 86d0390c30..4e53ba7888 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -158,7 +158,7 @@ impl RequestContextBuilder { } } - pub fn extend(original: &RequestContext) -> Self { + pub fn extend(original: &mut RequestContext) -> Self { Self { // This is like a Copy, but avoid implementing Copy because ordinary users of // RequestContext should always move or ref it. diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 482879630a..4f327eeb68 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -352,7 +352,7 @@ async fn build_timeline_info( timeline: &Arc, include_non_incremental_logical_size: bool, force_await_initial_logical_size: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -381,7 +381,7 @@ async fn build_timeline_info( async fn build_timeline_info_common( timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, logical_size_task_priority: tenant::timeline::GetLogicalSizePriority, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ed409d3130..8fc293b2be 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -53,7 +53,7 @@ pub async fn import_timeline_from_postgres_datadir( tline: &Timeline, pgdata_path: &Utf8Path, pgdata_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { let mut pg_control: Option = None; @@ -121,7 +121,7 @@ async fn import_rel( dboid: Oid, reader: &mut (impl AsyncRead + Unpin), len: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Does it look like a relation file? trace!("importing rel file {}", path.display()); @@ -210,7 +210,7 @@ async fn import_slru( path: &Path, reader: &mut (impl AsyncRead + Unpin), len: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { info!("importing slru file {path:?}"); @@ -268,7 +268,7 @@ async fn import_wal( tline: &Timeline, startpoint: Lsn, endpoint: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version); @@ -346,7 +346,7 @@ pub async fn import_basebackup_from_tar( tline: &Timeline, reader: &mut (impl AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { info!("importing base at {base_lsn}"); let mut modification = tline.begin_modification(base_lsn); @@ -397,7 +397,7 @@ pub async fn import_wal_from_tar( reader: &mut (impl AsyncRead + Send + Sync + Unpin), start_lsn: Lsn, end_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { // Set up walingest mutable state let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version); @@ -489,7 +489,7 @@ async fn import_file( file_path: &Path, reader: &mut (impl AsyncRead + Send + Sync + Unpin), len: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result> { let file_name = match file_path.file_name() { Some(name) => name.to_string_lossy(), diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 5c8f350f7b..816d231127 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -333,7 +333,7 @@ pub(crate) static PAGE_CACHE: Lazy = Lazy::new(|| PageCacheMet }); impl PageCacheMetrics { - pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind { + pub(crate) fn for_ctx(&self, ctx: &mut RequestContext) -> &PageCacheMetricsForTaskKind { &self.map[ctx.task_kind()][ctx.page_content_kind()] } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index f386c825b8..04883252f2 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -331,7 +331,7 @@ impl PageCache { &self, file_id: FileId, blkno: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx) .await @@ -422,7 +422,7 @@ impl PageCache { async fn lock_for_read( &self, cache_key: &CacheKey, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let mut permit = Some(self.try_get_pinned_slot_permit().await?); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ebc23e8945..eaaefbc00c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -867,7 +867,7 @@ impl PageServerHandler { request_lsn: Lsn, not_modified_since: Lsn, latest_gc_cutoff_lsn: &RcuReadGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let last_record_lsn = timeline.get_last_record_lsn(); @@ -926,7 +926,7 @@ impl PageServerHandler { tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -958,7 +958,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, req: &PagestreamExistsRequest, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline @@ -990,7 +990,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, req: &PagestreamNblocksRequest, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; @@ -1023,7 +1023,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, req: &PagestreamDbSizeRequest, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; @@ -1173,7 +1173,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, req: &PagestreamGetPageRequest, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let timeline = match self.get_cached_timeline_for_page(req) { Ok(tl) => { @@ -1233,7 +1233,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, req: &PagestreamGetSlruSegmentRequest, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; @@ -1275,7 +1275,7 @@ impl PageServerHandler { prev_lsn: Option, full_backup: bool, gzip: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 25d00d6dfd..d78486153d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -188,7 +188,7 @@ impl Timeline { tag: RelTag, blknum: BlockNumber, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { if tag.relnode == 0 { return Err(PageReconstructError::Other( @@ -218,7 +218,7 @@ impl Timeline { spcnode: Oid, dbnode: Oid, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let mut total_blocks = 0; @@ -236,7 +236,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { if tag.relnode == 0 { return Err(PageReconstructError::Other( @@ -272,7 +272,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { if tag.relnode == 0 { return Err(PageReconstructError::Other( @@ -307,7 +307,7 @@ impl Timeline { spcnode: Oid, dbnode: Oid, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); @@ -335,7 +335,7 @@ impl Timeline { kind: SlruKind, segno: u32, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let n_blocks = self .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) @@ -357,7 +357,7 @@ impl Timeline { segno: u32, blknum: BlockNumber, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let key = slru_block_to_key(kind, segno, blknum); self.get(key, lsn, ctx).await @@ -369,7 +369,7 @@ impl Timeline { kind: SlruKind, segno: u32, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let key = slru_segment_size_to_key(kind, segno); let mut buf = version.get(self, key, ctx).await?; @@ -382,7 +382,7 @@ impl Timeline { kind: SlruKind, segno: u32, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { // fetch directory listing let key = slru_dir_to_key(kind); @@ -408,7 +408,7 @@ impl Timeline { &self, search_timestamp: TimestampTz, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { pausable_failpoint!("find-lsn-for-timestamp-pausable"); @@ -499,7 +499,7 @@ impl Timeline { probe_lsn: Lsn, found_smaller: &mut bool, found_larger: &mut bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { self.map_all_timestamps(probe_lsn, ctx, |timestamp| { if timestamp >= search_timestamp { @@ -519,7 +519,7 @@ impl Timeline { pub(crate) async fn get_timestamp_for_lsn( &self, probe_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { let mut max: Option = None; self.map_all_timestamps(probe_lsn, ctx, |timestamp| { @@ -542,7 +542,7 @@ impl Timeline { async fn map_all_timestamps( &self, probe_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, mut f: impl FnMut(TimestampTz) -> ControlFlow, ) -> Result { for segno in self @@ -575,7 +575,7 @@ impl Timeline { pub(crate) async fn get_slru_keyspace( &self, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let mut accum = KeySpaceAccum::new(); @@ -604,7 +604,7 @@ impl Timeline { &self, kind: SlruKind, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { // fetch directory entry let key = slru_dir_to_key(kind); @@ -621,7 +621,7 @@ impl Timeline { spcnode: Oid, dbnode: Oid, version: Version<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let key = relmap_file_key(spcnode, dbnode); @@ -632,7 +632,7 @@ impl Timeline { pub(crate) async fn list_dbdirs( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { // fetch directory entry let buf = self.get(DBDIR_KEY, lsn, ctx).await?; @@ -647,7 +647,7 @@ impl Timeline { &self, xid: TransactionId, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let key = twophase_file_key(xid); let buf = self.get(key, lsn, ctx).await?; @@ -657,7 +657,7 @@ impl Timeline { pub(crate) async fn list_twophase_files( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { // fetch directory entry let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?; @@ -671,7 +671,7 @@ impl Timeline { pub(crate) async fn get_control_file( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { self.get(CONTROLFILE_KEY, lsn, ctx).await } @@ -679,7 +679,7 @@ impl Timeline { pub(crate) async fn get_checkpoint( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { self.get(CHECKPOINT_KEY, lsn, ctx).await } @@ -687,7 +687,7 @@ impl Timeline { async fn list_aux_files_v1( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { match self.get(AUX_FILES_KEY, lsn, ctx).await { Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") { @@ -705,7 +705,7 @@ impl Timeline { async fn list_aux_files_v2( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { let kv = self .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx) @@ -729,7 +729,7 @@ impl Timeline { pub(crate) async fn trigger_aux_file_size_computation( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), PageReconstructError> { let current_policy = self.last_aux_file_policy.load(); if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy { @@ -741,7 +741,7 @@ impl Timeline { pub(crate) async fn list_aux_files( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { let current_policy = self.last_aux_file_policy.load(); match current_policy { @@ -779,7 +779,7 @@ impl Timeline { pub(crate) async fn get_replorigins( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { let kv = self .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx) @@ -809,7 +809,7 @@ impl Timeline { pub(crate) async fn get_current_logical_size_non_incremental( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); @@ -845,7 +845,7 @@ impl Timeline { pub(crate) async fn collect_keyspace( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> { // Iterate through key ranges, greedily packing them into partitions let mut result = KeySpaceAccum::new(); @@ -1145,7 +1145,7 @@ impl<'a> DatadirModification<'a> { spcnode: Oid, dbnode: Oid, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Add it to the directory (if it doesn't exist already) let buf = self.get(DBDIR_KEY, ctx).await?; @@ -1182,7 +1182,7 @@ impl<'a> DatadirModification<'a> { &mut self, xid: TransactionId, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Add it to the directory entry let buf = self.get(TWOPHASEDIR_KEY, ctx).await?; @@ -1229,7 +1229,7 @@ impl<'a> DatadirModification<'a> { &mut self, spcnode: Oid, dbnode: Oid, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let total_blocks = self .tline @@ -1266,7 +1266,7 @@ impl<'a> DatadirModification<'a> { &mut self, rel: RelTag, nblocks: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), RelationError> { if rel.relnode == 0 { return Err(RelationError::InvalidRelnode); @@ -1328,7 +1328,7 @@ impl<'a> DatadirModification<'a> { &mut self, rel: RelTag, nblocks: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); if self @@ -1362,7 +1362,7 @@ impl<'a> DatadirModification<'a> { &mut self, rel: RelTag, nblocks: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); @@ -1384,7 +1384,7 @@ impl<'a> DatadirModification<'a> { } /// Drop a relation. - pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { + pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &mut RequestContext) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Remove it from the directory entry @@ -1420,7 +1420,7 @@ impl<'a> DatadirModification<'a> { kind: SlruKind, segno: u32, nblocks: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Add it to the directory entry let dir_key = slru_dir_to_key(kind); @@ -1466,7 +1466,7 @@ impl<'a> DatadirModification<'a> { &mut self, kind: SlruKind, segno: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Remove it from the directory entry let dir_key = slru_dir_to_key(kind); @@ -1499,7 +1499,7 @@ impl<'a> DatadirModification<'a> { pub async fn drop_twophase_file( &mut self, xid: TransactionId, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Remove it from the directory entry let buf = self.get(TWOPHASEDIR_KEY, ctx).await?; @@ -1538,7 +1538,7 @@ impl<'a> DatadirModification<'a> { &mut self, path: &str, content: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let switch_policy = self.tline.get_switch_aux_file_policy(); @@ -1731,7 +1731,7 @@ impl<'a> DatadirModification<'a> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { + pub async fn flush(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -1777,7 +1777,7 @@ impl<'a> DatadirModification<'a> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { + pub async fn commit(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> { let mut writer = self.tline.writer().await; let pending_nblocks = self.pending_nblocks; @@ -1828,7 +1828,7 @@ impl<'a> DatadirModification<'a> { // Internal helper functions to batch the modifications - async fn get(&self, key: Key, ctx: &RequestContext) -> Result { + async fn get(&self, key: Key, ctx: &mut RequestContext) -> Result { // Have we already updated the same key? Read the latest pending updated // version in that case. // @@ -1895,7 +1895,7 @@ impl<'a> Version<'a> { &self, timeline: &Timeline, key: Key, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { match self { Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ca5765c99b..44e9f2c0ec 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -547,7 +547,7 @@ impl Tenant { metadata: TimelineMetadata, ancestor: Option>, last_aux_file_policy: Option, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -656,7 +656,7 @@ impl Tenant { init_order: Option, tenants: &'static std::sync::RwLock, mode: SpawnMode, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( conf, @@ -965,7 +965,7 @@ impl Tenant { self: &Arc, preload: Option, mode: SpawnMode, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); @@ -1175,7 +1175,7 @@ impl Tenant { index_part: IndexPart, remote_metadata: TimelineMetadata, resources: TimelineResources, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); @@ -1358,7 +1358,7 @@ impl Tenant { new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result { anyhow::ensure!( self.is_active(), @@ -1401,7 +1401,7 @@ impl Tenant { new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let uninit_tl = self .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) @@ -1440,7 +1440,7 @@ impl Tenant { new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, delta_layer_desc: Vec>, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, @@ -1477,7 +1477,7 @@ impl Tenant { pg_version: u32, load_existing_initdb: Option, broker_client: storage_broker::BrokerClientChannel, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, CreateTimelineError> { if !self.is_active() { if matches!(self.current_state(), TenantState::Stopping { .. }) { @@ -1650,7 +1650,7 @@ impl Tenant { horizon: u64, pitr: Duration, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { // Don't start doing work during shutdown if let TenantState::Stopping { .. } = self.current_state() { @@ -1682,7 +1682,7 @@ impl Tenant { async fn compaction_iteration( &self, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<(), timeline::CompactionError> { // Don't start doing work during shutdown, or when broken, we do not need those in the logs if !self.is_active() { @@ -1779,7 +1779,7 @@ impl Tenant { self: &Arc, broker_client: BrokerClientChannel, background_jobs_can_start: Option<&completion::Barrier>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { span::debug_assert_current_span_has_tenant_id(); @@ -2834,7 +2834,7 @@ impl Tenant { horizon: u64, pitr: Duration, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let mut totals: GcResult = Default::default(); let now = Instant::now(); @@ -2894,7 +2894,7 @@ impl Tenant { pub(crate) async fn refresh_gc_info( &self, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GcError> { // since this method can now be called at different rates than the configured gc loop, it // might be that these configuration values get applied faster than what it was previously, @@ -2915,7 +2915,7 @@ impl Tenant { horizon: u64, pitr: Duration, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GcError> { // before taking the gc_cs lock, do the heavier weight finding of gc_cutoff points for // currently visible timelines. @@ -3053,7 +3053,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, ancestor_lsn: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, CreateTimelineError> { let create_guard = self.create_timeline_create_guard(dst_id).unwrap(); let tl = self @@ -3071,7 +3071,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, ancestor_lsn: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, delta_layer_desc: Vec>, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, @@ -3108,7 +3108,7 @@ impl Tenant { dst_id: TimelineId, start_lsn: Option, timeline_create_guard: TimelineCreateGuard<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, CreateTimelineError> { self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx) .await @@ -3120,7 +3120,7 @@ impl Tenant { dst_id: TimelineId, start_lsn: Option, timeline_create_guard: TimelineCreateGuard<'_>, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> Result, CreateTimelineError> { let src_id = src_timeline.timeline_id; @@ -3233,7 +3233,7 @@ impl Tenant { timeline_id: TimelineId, pg_version: u32, load_existing_initdb: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let create_guard = self.create_timeline_create_guard(timeline_id).unwrap(); self.bootstrap_timeline( @@ -3305,7 +3305,7 @@ impl Tenant { pg_version: u32, load_existing_initdb: Option, timeline_create_guard: TimelineCreateGuard<'_>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. @@ -3563,7 +3563,7 @@ impl Tenant { max_retention_period: Option, cause: LogicalSizeCalculationCause, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let logical_sizes_at_once = self .conf @@ -3603,7 +3603,7 @@ impl Tenant { &self, cause: LogicalSizeCalculationCause, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?; @@ -3756,7 +3756,7 @@ async fn run_initdb( pub async fn dump_layerfile_from_path( path: &Utf8Path, verbose: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { use std::os::unix::fs::FileExt; @@ -3960,7 +3960,7 @@ pub(crate) mod harness { #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) async fn do_try_load( &self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); @@ -4219,7 +4219,7 @@ mod tests { async fn make_some_layers( tline: &Timeline, start_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let mut lsn = start_lsn; { @@ -4707,7 +4707,7 @@ mod tests { async fn bulk_insert_compact_gc( tenant: &Tenant, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, lsn: Lsn, repeat: usize, key_count: usize, @@ -4719,7 +4719,7 @@ mod tests { async fn bulk_insert_maybe_compact_gc( tenant: &Tenant, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, mut lsn: Lsn, repeat: usize, key_count: usize, @@ -6249,7 +6249,7 @@ mod tests { tline: &Timeline, keyspace: &KeySpace, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<(BTreeMap>, usize)> { let mut reconstruct_state = ValuesReconstructState::default(); let res = tline @@ -6365,7 +6365,7 @@ mod tests { tline: &Arc, key: Key, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, GetVectoredError> { let mut reconstruct_state = ValuesReconstructState::new(); let mut res = tline @@ -6461,7 +6461,7 @@ mod tests { tline: &Arc, key: Key, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, GetVectoredError> { let mut reconstruct_state = ValuesReconstructState::new(); let mut res = tline @@ -6515,7 +6515,7 @@ mod tests { tline: &Arc, key: Key, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, GetVectoredError> { let mut reconstruct_state = ValuesReconstructState::new(); let mut res = tline diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2be8816cef..50eb371a78 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -26,7 +26,7 @@ impl<'a> BlockCursor<'a> { pub async fn read_blob( &self, offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, std::io::Error> { let mut buf = Vec::new(); self.read_blob_into_buf(offset, &mut buf, ctx).await?; @@ -38,7 +38,7 @@ impl<'a> BlockCursor<'a> { &self, offset: u64, dstbuf: &mut Vec, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), std::io::Error> { let mut blknum = (offset / PAGE_SZ as u64) as u32; let mut off = (offset % PAGE_SZ as u64) as usize; @@ -130,7 +130,7 @@ impl BlobWriter { async fn write_all_unbuffered, Buf: IoBuf + Send>( &mut self, src_buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (B::Buf, Result<(), Error>) { let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let nbytes = match res { @@ -143,7 +143,7 @@ impl BlobWriter { #[inline(always)] /// Flushes the internal buffer to the underlying `VirtualFile`. - pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { + pub async fn flush_buffer(&mut self, ctx: &mut RequestContext) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); let (mut buf, res) = self.inner.write_all(buf, ctx).await; res?; @@ -166,7 +166,7 @@ impl BlobWriter { async fn write_all, Buf: IoBuf + Send>( &mut self, src_buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (B::Buf, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); @@ -218,7 +218,7 @@ impl BlobWriter { pub async fn write_blob, Buf: IoBuf + Send>( &mut self, srcbuf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (B::Buf, Result) { let offset = self.offset; @@ -267,7 +267,7 @@ impl BlobWriter { /// /// This function flushes the internal buffer before giving access /// to the underlying `VirtualFile`. - pub async fn into_inner(mut self, ctx: &RequestContext) -> Result { + pub async fn into_inner(mut self, ctx: &mut RequestContext) -> Result { self.flush_buffer(ctx).await?; Ok(self.inner) } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 92928116c1..a98505dabb 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -92,7 +92,7 @@ impl<'a> BlockReaderRef<'a> { async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { use BlockReaderRef::*; match self { @@ -150,7 +150,7 @@ impl<'a> BlockCursor<'a> { pub async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { self.reader.read_blk(blknum, ctx).await } @@ -177,7 +177,7 @@ impl<'a> FileBlockReader<'a> { &self, buf: PageWriteGuard<'static>, blkno: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file @@ -192,7 +192,7 @@ impl<'a> FileBlockReader<'a> { pub async fn read_blk<'b>( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, std::io::Error> { let cache = page_cache::get(); match cache diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 8b36aa15e5..355fbc81e3 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -404,7 +404,7 @@ impl DeleteTenantFlow { tenant: &Arc, preload: Option, tenants: &'static std::sync::RwLock, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), DeleteTenantError> { let (_, progress) = completion::channel(); diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 119df3e6c4..b8bc7edf9c 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -242,7 +242,7 @@ where /// /// Read the value for given key. Returns the value, or None if it doesn't exist. /// - pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result> { + pub async fn get(&self, search_key: &[u8; L], ctx: &mut RequestContext) -> Result> { let mut result: Option = None; self.visit( search_key, @@ -278,7 +278,7 @@ where pub fn get_stream_from<'a>( &'a self, start_key: &'a [u8; L], - ctx: &'a RequestContext, + ctx: &'a mut RequestContext, ) -> impl Stream, u64), DiskBtreeError>> + 'a { try_stream! { let mut stack = Vec::new(); @@ -363,7 +363,7 @@ where search_key: &[u8; L], dir: VisitDirection, mut visitor: V, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result where V: FnMut(&[u8], u64) -> bool, diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 79cc7bf153..015aeccab1 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -28,7 +28,7 @@ impl EphemeralFile { conf: &PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); let filename_disambiguator = @@ -68,7 +68,7 @@ impl EphemeralFile { pub(crate) async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { self.rw.read_blk(blknum, ctx).await } @@ -76,7 +76,7 @@ impl EphemeralFile { pub(crate) async fn write_blob( &mut self, srcbuf: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let pos = self.rw.bytes_written(); diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 276ac87064..f0f18be026 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -38,7 +38,7 @@ impl RW { pub(crate) async fn write_all_borrowed( &mut self, srcbuf: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { // It doesn't make sense to proactively fill the page cache on the Pageserver write path // because Compute is unlikely to access recently written data. @@ -52,7 +52,7 @@ impl RW { pub(crate) async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { match self.rw.read_blk(blknum).await? { zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => { @@ -138,7 +138,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi >( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)> { let buf = buf.slice(..); let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs index b37eafb52c..fc21aa06df 100644 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs +++ b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs @@ -64,7 +64,7 @@ where pub async fn write_all_borrowed( &mut self, buf: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result { self.buffered_writer.write_buffered_borrowed(buf, ctx).await } diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 2724a5cc07..daf3e1d731 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -850,7 +850,7 @@ impl LayerMap { /// debugging function to print out the contents of the layer map #[allow(unused)] - pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> { println!("Begin dump LayerMap"); println!("open_layer:"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4520bb9295..c60c20350a 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -696,7 +696,7 @@ fn tenant_spawn( init_order: Option, tenants: &'static std::sync::RwLock, mode: SpawnMode, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { anyhow::ensure!( tenant_path.is_dir(), @@ -956,7 +956,7 @@ impl TenantManager { new_location_config: LocationConf, flush: Option, mut spawn_mode: SpawnMode, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, UpsertLocationError> { debug_assert_current_span_has_tenant_id(); info!("configuring tenant location to state {new_location_config:?}"); @@ -1247,7 +1247,7 @@ impl TenantManager { &self, tenant_shard_id: TenantShardId, drop_cache: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; let Some(old_slot) = slot_guard.get_old_value() else { @@ -1509,7 +1509,7 @@ impl TenantManager { tenant: Arc, new_shard_count: ShardCount, new_stripe_size: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let tenant_shard_id = *tenant.get_tenant_shard_id(); let r = self @@ -1539,7 +1539,7 @@ impl TenantManager { tenant: Arc, new_shard_count: ShardCount, new_stripe_size: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let tenant_shard_id = *tenant.get_tenant_shard_id(); @@ -1994,7 +1994,7 @@ impl TenantManager { tenant_shard_id: TenantShardId, timeline_id: TimelineId, prepared: PreparedTimelineDetach, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, anyhow::Error> { struct RevertOnDropSlot(Option); @@ -2229,7 +2229,7 @@ pub(crate) async fn load_tenant( broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), TenantMapInsertError> { // This is a legacy API (replaced by `/location_conf`). It does not support sharding let tenant_shard_id = TenantShardId::unsharded(tenant_id); @@ -2837,7 +2837,7 @@ pub(crate) async fn immediate_gc( timeline_id: TimelineId, gc_req: TimelineGcRequest, cancel: CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let tenant = { let guard = TENANTS.read().unwrap(); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e33e4b84aa..212a6c5bbe 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -518,7 +518,7 @@ impl RemoteTimelineClient { layer_metadata: &LayerFileMetadata, local_path: &Utf8Path, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let downloaded_size = { let _unfinished_gauge_guard = self.metrics.call_begin( diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d0385e4aee..cf7f0b72da 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -52,7 +52,7 @@ pub async fn download_layer_file<'a>( layer_metadata: &'a LayerFileMetadata, local_path: &Utf8Path, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -107,9 +107,9 @@ pub async fn download_layer_file<'a>( // the in-memory state of the filesystem already has the layer file in its final place, // and subsequent pageserver code could think it's durable while it really isn't. let work = { - let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior()); + let mut ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior()); async move { - let timeline_dir = VirtualFile::open(&timeline_path, &ctx) + let timeline_dir = VirtualFile::open(&timeline_path, &mut ctx) .await .fatal_err("VirtualFile::open for timeline dir fsync"); timeline_dir @@ -140,7 +140,7 @@ async fn download_object<'a>( src_path: &RemotePath, dst_path: &Utf8PathBuf, cancel: &CancellationToken, - #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, + #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &mut RequestContext, ) -> Result { let res = match crate::virtual_file::io_engine::get() { crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"), diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 24176ecf19..e1ae522f0a 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -506,7 +506,7 @@ impl<'a> TenantDownloader<'a> { } } - async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> { + async fn download(&self, ctx: &mut RequestContext) -> Result<(), UpdateError> { debug_assert_current_span_has_tenant_id(); // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure @@ -831,7 +831,7 @@ impl<'a> TenantDownloader<'a> { &self, timeline: HeatMapTimeline, timeline_state: SecondaryDetailTimeline, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), UpdateError> { debug_assert_current_span_has_tenant_and_timeline_id(); let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); @@ -978,7 +978,7 @@ impl<'a> TenantDownloader<'a> { tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, layer: HeatMapLayer, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, UpdateError> { // Failpoint for simulating slow remote storage failpoint_support::sleep_millis_async!( diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index b2338b620e..5f5e7df214 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -148,7 +148,7 @@ pub(super) async fn gather_inputs( logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, cause: LogicalSizeCalculationCause, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { // refresh is needed to update gc related pitr_cutoff and horizon_cutoff tenant.refresh_gc_info(cancel, ctx).await?; @@ -379,7 +379,7 @@ async fn fill_logical_sizes( limit: &Arc, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, cause: LogicalSizeCalculationCause, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CalculateSyntheticSizeError> { let timeline_hash: HashMap> = HashMap::from_iter( timelines diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9607546ce0..133f375daa 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -425,7 +425,7 @@ impl ReadableLayer { keyspace: KeySpace, lsn_range: Range, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { match self { ReadableLayer::PersistentLayer(layer) => { @@ -574,7 +574,7 @@ impl LayerAccessStats { }); } - fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) { + fn record_access(&self, access_kind: LayerAccessKind, ctx: &mut RequestContext) { if ctx.access_stats_behavior() == AccessStatsBehavior::Skip { return; } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5e01ecd71d..17fcc78da6 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -249,7 +249,7 @@ impl AsLayerDesc for DeltaLayer { } impl DeltaLayer { - pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> { self.desc.dump(); if !verbose { @@ -292,7 +292,7 @@ impl DeltaLayer { async fn load( &self, access_kind: LayerAccessKind, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<&Arc> { self.access_stats.record_access(access_kind, ctx); // Quick exit if already loaded @@ -302,7 +302,7 @@ impl DeltaLayer { .with_context(|| format!("Failed to load delta layer {}", self.path())) } - async fn load_inner(&self, ctx: &RequestContext) -> Result> { + async fn load_inner(&self, ctx: &mut RequestContext) -> Result> { let path = self.path(); let loaded = DeltaLayerInner::load(&path, None, None, ctx) @@ -393,7 +393,7 @@ impl DeltaLayerWriterInner { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know // the end key yet, so we cannot form the final filename yet. We will @@ -435,7 +435,7 @@ impl DeltaLayerWriterInner { key: Key, lsn: Lsn, val: Value, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let (_, res) = self .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx) @@ -449,7 +449,7 @@ impl DeltaLayerWriterInner { lsn: Lsn, val: Vec, will_init: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); let (val, res) = self.blob_writer.write_blob(val, ctx).await; @@ -476,7 +476,7 @@ impl DeltaLayerWriterInner { self, key_end: Key, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let temp_path = self.path.clone(); let result = self.finish0(key_end, timeline, ctx).await; @@ -493,7 +493,7 @@ impl DeltaLayerWriterInner { self, key_end: Key, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -603,7 +603,7 @@ impl DeltaLayerWriter { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( @@ -630,7 +630,7 @@ impl DeltaLayerWriter { key: Key, lsn: Lsn, val: Value, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { self.inner .as_mut() @@ -645,7 +645,7 @@ impl DeltaLayerWriter { lsn: Lsn, val: Vec, will_init: bool, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (Vec, anyhow::Result<()>) { self.inner .as_mut() @@ -665,7 +665,7 @@ impl DeltaLayerWriter { mut self, key_end: Key, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { self.inner .take() @@ -704,7 +704,7 @@ impl DeltaLayer { pub async fn rewrite_summary( path: &Utf8Path, rewrite: F, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), RewriteSummaryError> where F: Fn(Summary) -> Summary, @@ -744,7 +744,7 @@ impl DeltaLayerInner { path: &Utf8Path, summary: Option, max_vectored_read_bytes: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, anyhow::Error> { let file = match VirtualFile::open(path, ctx).await { Ok(file) => file, @@ -793,7 +793,7 @@ impl DeltaLayerInner { key: Key, lsn_range: Range, reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let mut need_image = true; // Scan the page versions backwards, starting from `lsn`. @@ -824,13 +824,13 @@ impl DeltaLayerInner { !blob_ref.will_init() }, - &RequestContextBuilder::extend(ctx) + &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(), ) .await?; - let ctx = &RequestContextBuilder::extend(ctx) + let ctx = &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerValue) .build(); @@ -889,7 +889,7 @@ impl DeltaLayerInner { keyspace: KeySpace, lsn_range: Range, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( @@ -931,7 +931,7 @@ impl DeltaLayerInner { #[cfg(test)] pub(super) async fn load_key_values( &self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( @@ -973,12 +973,12 @@ impl DeltaLayerInner { index_reader: DiskBtreeReader, mut planner: VectoredReadPlanner, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> where Reader: BlockReader, { - let ctx = RequestContextBuilder::extend(ctx) + let mut ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(); @@ -986,7 +986,7 @@ impl DeltaLayerInner { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - let index_stream = index_reader.get_stream_from(&start_key.0, &ctx); + let index_stream = index_reader.get_stream_from(&start_key.0, &mut ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -1062,7 +1062,7 @@ impl DeltaLayerInner { &self, reads: Vec, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { let vectored_blob_reader = VectoredBlobReader::new(&self.file); let mut ignore_key_with_err = None; @@ -1140,7 +1140,7 @@ impl DeltaLayerInner { pub(super) async fn load_keys<'a>( &'a self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( @@ -1179,7 +1179,7 @@ impl DeltaLayerInner { all_keys.push(entry); true }, - &RequestContextBuilder::extend(ctx) + &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(), ) @@ -1199,7 +1199,7 @@ impl DeltaLayerInner { &self, writer: &mut DeltaLayerWriter, until: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { use crate::tenant::vectored_blob_io::{ BlobMeta, VectoredReadBuilder, VectoredReadExtended, @@ -1387,7 +1387,7 @@ impl DeltaLayerInner { Ok(records) } - pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> { + pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> { println!( "index_start_blk: {}, root {}", self.index_start_blk, self.index_root_blk @@ -1404,7 +1404,7 @@ impl DeltaLayerInner { let keys = self.load_keys(ctx).await?; - async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result { + async fn dump_blob(val: &ValueRef<'_>, ctx: &mut RequestContext) -> anyhow::Result { let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?; let val = Value::des(&buf)?; let desc = match val { @@ -1513,7 +1513,7 @@ pub struct ValueRef<'a> { impl<'a> ValueRef<'a> { /// Loads the value from disk - pub async fn load(&self, ctx: &RequestContext) -> Result { + pub async fn load(&self, ctx: &mut RequestContext) -> Result { // theoretically we *could* record an access time for each, but it does not really matter let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?; let val = Value::des(&buf)?; @@ -1527,7 +1527,7 @@ impl> Adapter { pub(crate) async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id); block_reader.read_blk(blknum, ctx).await @@ -2060,7 +2060,7 @@ mod test { source: &DeltaLayerInner, truncated: &DeltaLayerInner, truncated_at: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { use futures::future::ready; use futures::stream::TryStreamExt; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 06e2f09384..defd643760 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -177,7 +177,7 @@ impl std::fmt::Debug for ImageLayerInner { } impl ImageLayerInner { - pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> { + pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new( self.index_start_blk, @@ -217,7 +217,7 @@ impl AsLayerDesc for ImageLayer { } impl ImageLayer { - pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> { self.desc.dump(); if !verbose { @@ -254,7 +254,7 @@ impl ImageLayer { async fn load( &self, access_kind: LayerAccessKind, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<&ImageLayerInner> { self.access_stats.record_access(access_kind, ctx); self.inner @@ -263,7 +263,7 @@ impl ImageLayer { .with_context(|| format!("Failed to load image layer {}", self.path())) } - async fn load_inner(&self, ctx: &RequestContext) -> Result { + async fn load_inner(&self, ctx: &mut RequestContext) -> Result { let path = self.path(); let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx) @@ -336,7 +336,7 @@ impl ImageLayer { pub async fn rewrite_summary( path: &Utf8Path, rewrite: F, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), RewriteSummaryError> where F: Fn(Summary) -> Summary, @@ -377,7 +377,7 @@ impl ImageLayerInner { lsn: Lsn, summary: Option, max_vectored_read_bytes: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, anyhow::Error> { let file = match VirtualFile::open(path, ctx).await { Ok(file) => file, @@ -428,7 +428,7 @@ impl ImageLayerInner { &self, key: Key, reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = @@ -439,7 +439,7 @@ impl ImageLayerInner { if let Some(offset) = tree_reader .get( &keybuf, - &RequestContextBuilder::extend(ctx) + &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::ImageLayerBtreeNode) .build(), ) @@ -449,7 +449,7 @@ impl ImageLayerInner { .block_cursor() .read_blob( offset, - &RequestContextBuilder::extend(ctx) + &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::ImageLayerValue) .build(), ) @@ -470,7 +470,7 @@ impl ImageLayerInner { &self, keyspace: KeySpace, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { let reads = self .plan_reads(keyspace, None, ctx) @@ -489,7 +489,7 @@ impl ImageLayerInner { #[cfg(test)] pub(super) async fn load_key_values( &self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = @@ -522,7 +522,7 @@ impl ImageLayerInner { &self, keyspace: KeySpace, shard_identity: Option<&ShardIdentity>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let mut planner = VectoredReadPlanner::new( self.max_vectored_read_bytes @@ -535,7 +535,7 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); - let ctx = RequestContextBuilder::extend(ctx) + let mut ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::ImageLayerBtreeNode) .build(); @@ -544,7 +544,7 @@ impl ImageLayerInner { let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); - let index_stream = tree_reader.get_stream_from(&search_key, &ctx); + let index_stream = tree_reader.get_stream_from(&search_key, &mut ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -587,7 +587,7 @@ impl ImageLayerInner { &self, shard_identity: &ShardIdentity, writer: &mut ImageLayerWriter, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { // Fragment the range into the regions owned by this ShardIdentity let plan = self @@ -629,7 +629,7 @@ impl ImageLayerInner { &self, reads: Vec, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { let max_vectored_read_bytes = self .max_vectored_read_bytes @@ -724,7 +724,7 @@ impl ImageLayerWriterInner { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. // We'll atomically rename it to the final name when we're done. @@ -779,7 +779,7 @@ impl ImageLayerWriterInner { &mut self, key: Key, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); let (_img, res) = self.blob_writer.write_blob(img, ctx).await; @@ -799,7 +799,7 @@ impl ImageLayerWriterInner { async fn finish( self, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -899,7 +899,7 @@ impl ImageLayerWriter { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( @@ -918,7 +918,7 @@ impl ImageLayerWriter { &mut self, key: Key, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } @@ -929,7 +929,7 @@ impl ImageLayerWriter { pub(crate) async fn finish( mut self, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { self.inner.take().unwrap().finish(timeline, ctx).await } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 1ecc56ce99..1bb48b5a94 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -256,7 +256,7 @@ impl InMemoryLayer { /// debugging function to print out the contents of the layer /// /// this is likely completly unused - pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { + pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> { let inner = self.inner.read().await; let end_str = self.end_lsn_or_max(); @@ -308,12 +308,12 @@ impl InMemoryLayer { key: Key, lsn_range: Range, reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { ensure!(lsn_range.start >= self.start_lsn); let mut need_image = true; - let ctx = RequestContextBuilder::extend(ctx) + let mut ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(); @@ -325,7 +325,7 @@ impl InMemoryLayer { if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); for (entry_lsn, pos) in slice.iter().rev() { - let buf = reader.read_blob(*pos, &ctx).await?; + let buf = reader.read_blob(*pos, &mut ctx).await?; let value = Value::des(&buf)?; match value { Value::Image(img) => { @@ -365,9 +365,9 @@ impl InMemoryLayer { keyspace: KeySpace, end_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { - let ctx = RequestContextBuilder::extend(ctx) + let mut ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(); @@ -410,7 +410,7 @@ impl InMemoryLayer { continue; } - let buf = reader.read_blob(block_read.block_offset, &ctx).await; + let buf = reader.read_blob(block_read.block_offset, &mut ctx).await; if let Err(e) = buf { reconstruct_state .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); @@ -473,7 +473,7 @@ impl InMemoryLayer { timeline_id: TimelineId, tenant_shard_id: TenantShardId, start_lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); @@ -512,7 +512,7 @@ impl InMemoryLayer { key: Key, lsn: Lsn, buf: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); @@ -525,7 +525,7 @@ impl InMemoryLayer { key: Key, lsn: Lsn, buf: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); @@ -534,7 +534,7 @@ impl InMemoryLayer { .file .write_blob( buf, - &RequestContextBuilder::extend(ctx) + &mut RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(), ) @@ -606,7 +606,7 @@ impl InMemoryLayer { pub(crate) async fn write_to_disk( &self, timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, key_range: Option>, ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 32acb3f0cd..8f270a8ad1 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -331,7 +331,7 @@ impl Layer { key: Key, lsn_range: Range, reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { use anyhow::ensure; @@ -361,7 +361,7 @@ impl Layer { keyspace: KeySpace, lsn_range: Range, reconstruct_data: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { let layer = self .0 @@ -392,7 +392,7 @@ impl Layer { #[cfg(test)] pub(crate) async fn load_key_values( &self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let layer = self .0 @@ -479,7 +479,7 @@ impl Layer { /// Traditional debug dumping facility #[allow(unused)] - pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> { + pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> anyhow::Result<()> { self.0.desc.dump(); if verbose { @@ -898,7 +898,7 @@ impl LayerInner { async fn get_or_maybe_download( self: &Arc, allow_download: bool, - ctx: Option<&RequestContext>, + ctx: Option<&mut RequestContext>, ) -> Result, DownloadError> { let (weak, permit) = { // get_or_init_detached can: @@ -988,7 +988,7 @@ impl LayerInner { return Err(DownloadError::NotFile(ft)); } - if let Some(ctx) = ctx { + if let Some(ref ctx) = ctx { self.check_expected_download(ctx)?; } @@ -1049,7 +1049,7 @@ impl LayerInner { self: &Arc, timeline: Arc, permit: heavier_once_cell::InitPermit, - ctx: RequestContext, + mut ctx: RequestContext, ) -> Result, DownloadError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -1079,7 +1079,7 @@ impl LayerInner { .await .unwrap(); - let res = this.download_and_init(timeline, permit, &ctx).await; + let res = this.download_and_init(timeline, permit, &mut ctx).await; if let Err(res) = tx.send(res) { match res { @@ -1122,7 +1122,7 @@ impl LayerInner { self: &Arc, timeline: Arc, permit: heavier_once_cell::InitPermit, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let result = timeline .remote_client @@ -1662,7 +1662,7 @@ impl DownloadedLayer { async fn get<'a>( &'a self, owner: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<&'a LayerKind> { let init = || async { assert_eq!( @@ -1736,7 +1736,7 @@ impl DownloadedLayer { lsn_range: Range, reconstruct_data: &mut ValueReconstructState, owner: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { use LayerKind::*; @@ -1758,7 +1758,7 @@ impl DownloadedLayer { lsn_range: Range, reconstruct_data: &mut ValuesReconstructState, owner: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { use LayerKind::*; @@ -1778,7 +1778,7 @@ impl DownloadedLayer { async fn load_key_values( &self, owner: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { use LayerKind::*; @@ -1788,7 +1788,7 @@ impl DownloadedLayer { } } - async fn dump(&self, owner: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + async fn dump(&self, owner: &Arc, ctx: &mut RequestContext) -> anyhow::Result<()> { use LayerKind::*; match self.get(owner, ctx).await? { Delta(d) => d.dump(ctx).await?, @@ -1837,7 +1837,7 @@ impl ResidentLayer { #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))] pub(crate) async fn load_keys<'a>( &'a self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result>> { use LayerKind::*; @@ -1866,7 +1866,7 @@ impl ResidentLayer { &'a self, shard_identity: &ShardIdentity, writer: &mut ImageLayerWriter, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { use LayerKind::*; @@ -1881,7 +1881,7 @@ impl ResidentLayer { &self, writer: &mut super::delta_layer::DeltaLayerWriter, until: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { use LayerKind::*; @@ -1907,7 +1907,7 @@ impl ResidentLayer { #[cfg(test)] pub(crate) async fn as_delta( &self, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<&delta_layer::DeltaLayerInner> { use LayerKind::*; match self.downloaded.get(&self.owner.0, ctx).await? { diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index d679b78f32..5034e919c6 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -73,7 +73,7 @@ static PERMIT_GAUGES: once_cell::sync::Lazy< /// Cancellation safe. pub(crate) async fn concurrent_background_tasks_rate_limit_permit( loop_kind: BackgroundLoopKind, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> tokio::sync::SemaphorePermit<'static> { let _guard = PERMIT_GAUGES[loop_kind].guard(); diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index f3f3d5e3ae..6ee79dbdf0 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -130,7 +130,7 @@ where self.inner.load().config.steady_rps() } - pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option { + pub async fn throttle(&self, ctx: &mut RequestContext, key_count: usize) -> Option { let inner = self.inner.load_full(); // clones the `Inner` Arc if !inner.task_kinds.contains(ctx.task_kind()) { return None; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5398ad399c..3a128aaabd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -871,7 +871,7 @@ impl Timeline { &self, key: Key, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { if !lsn.is_valid() { return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); @@ -946,7 +946,7 @@ impl Timeline { key: Key, lsn: Lsn, mut reconstruct_state: ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { // XXX: structured stats collection for layer eviction here. trace!( @@ -1004,7 +1004,7 @@ impl Timeline { &self, keyspace: KeySpace, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GetVectoredError> { if !lsn.is_valid() { return Err(GetVectoredError::InvalidLsn(lsn)); @@ -1101,7 +1101,7 @@ impl Timeline { &self, keyspace: KeySpace, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GetVectoredError> { if !lsn.is_valid() { return Err(GetVectoredError::InvalidLsn(lsn)); @@ -1158,7 +1158,7 @@ impl Timeline { &self, keyspace: KeySpace, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GetVectoredError> { let mut values = BTreeMap::new(); @@ -1217,7 +1217,7 @@ impl Timeline { keyspace: KeySpace, lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result>, GetVectoredError> { let get_kind = if keyspace.total_raw_size() == 1 { GetKind::Singular @@ -1274,7 +1274,7 @@ impl Timeline { vectored_res: &Result>, GetVectoredError>, keyspace: KeySpace, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { if keyspace.overlaps(&Key::metadata_key_range()) { // skip validation for metadata key range @@ -1444,7 +1444,7 @@ impl Timeline { &self, lsn: Lsn, who_is_waiting: WaitLsnWaiter<'_>, - ctx: &RequestContext, /* Prepare for use by cancellation */ + ctx: &mut RequestContext, /* Prepare for use by cancellation */ ) -> Result<(), WaitLsnError> { let state = self.current_state(); if self.cancel.is_cancelled() || matches!(state, TimelineState::Stopping) { @@ -1540,7 +1540,7 @@ impl Timeline { &self, lsn: Lsn, length: Duration, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result { let lease = { let mut gc_info = self.gc_info.write().unwrap(); @@ -1713,7 +1713,7 @@ impl Timeline { self: &Arc, cancel: &CancellationToken, flags: EnumSet, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CompactionError> { // most likely the cancellation token is from background task, but in tests it could be the // request task as well. @@ -1765,7 +1765,7 @@ impl Timeline { parent: Arc, broker_client: BrokerClientChannel, background_jobs_can_start: Option<&completion::Barrier>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { if self.tenant_shard_id.is_shard_zero() { // Logical size is only maintained accurately on shard zero. @@ -1946,7 +1946,7 @@ impl Timeline { pub(crate) async fn wait_to_become_active( &self, - _ctx: &RequestContext, // Prepare for use by cancellation + _ctx: &mut RequestContext, // Prepare for use by cancellation ) -> Result<(), TimelineState> { let mut receiver = self.state.subscribe(); loop { @@ -2448,7 +2448,7 @@ impl Timeline { /// when the timeline is activated. fn launch_wal_receiver( self: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, broker_client: BrokerClientChannel, ) { info!( @@ -2678,7 +2678,7 @@ impl Timeline { pub(crate) fn get_current_logical_size( self: &Arc, priority: GetLogicalSizePriority, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> logical_size::CurrentLogicalSize { if !self.tenant_shard_id.is_shard_zero() { // Logical size is only accurately maintained on shard zero: when called elsewhere, for example @@ -2745,7 +2745,7 @@ impl Timeline { current_size } - fn spawn_initial_logical_size_computation_task(self: &Arc, ctx: &RequestContext) { + fn spawn_initial_logical_size_computation_task(self: &Arc, ctx: &mut RequestContext) { let Some(initial_part_end) = self.current_logical_size.initial_part_end else { // nothing to do for freshly created timelines; assert_eq!( @@ -2963,7 +2963,7 @@ impl Timeline { self: &Arc, lsn: Lsn, cause: LogicalSizeCalculationCause, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { crate::span::debug_assert_current_span_has_tenant_and_timeline_id(); // We should never be calculating logical sizes on shard !=0, because these shards do not have @@ -3006,7 +3006,7 @@ impl Timeline { up_to_lsn: Lsn, cause: LogicalSizeCalculationCause, _guard: &GateGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { info!( "Calculating logical size for timeline {} at {}", @@ -3169,7 +3169,7 @@ impl Timeline { key: Key, request_lsn: Lsn, reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, PageReconstructError> { // Start from the current timeline. let mut timeline_owned; @@ -3370,7 +3370,7 @@ impl Timeline { mut keyspace: KeySpace, request_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), GetVectoredError> { let mut timeline_owned: Arc; let mut timeline = self; @@ -3477,7 +3477,7 @@ impl Timeline { mut cont_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let mut unmapped_keyspace = keyspace.clone(); let mut fringe = LayerFringe::new(); @@ -3585,7 +3585,7 @@ impl Timeline { async fn get_ready_ancestor_timeline( &self, ancestor: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, GetReadyAncestorError> { // It's possible that the ancestor timeline isn't active yet, or // is active but hasn't yet caught up to the branch point. Wait @@ -3653,7 +3653,7 @@ impl Timeline { async fn get_layer_for_write( &self, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let mut guard = self.layers.write().await; let layer = guard @@ -3697,7 +3697,7 @@ impl Timeline { async fn flush_loop( self: &Arc, mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { info!("started flush loop"); loop { @@ -3856,7 +3856,7 @@ impl Timeline { async fn flush_frozen_layer( self: &Arc, frozen_layer: Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -4085,7 +4085,7 @@ impl Timeline { self: &Arc, frozen_layer: &Arc, key_range: Option>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -4142,7 +4142,7 @@ impl Timeline { lsn: Lsn, partition_size: u64, flags: EnumSet, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> { let Ok(mut partitioning_guard) = self.partitioning.try_lock() else { // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline. @@ -4239,7 +4239,7 @@ impl Timeline { partition: &KeySpace, mut image_layer_writer: ImageLayerWriter, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, img_range: Range, start: Key, ) -> Result { @@ -4339,7 +4339,7 @@ impl Timeline { partition: &KeySpace, mut image_layer_writer: ImageLayerWriter, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, img_range: Range, mode: ImageLayerCreationMode, start: Key, @@ -4423,7 +4423,7 @@ impl Timeline { partitioning: &KeyPartitioning, lsn: Lsn, mode: ImageLayerCreationMode, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, CreateImageLayersError> { let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers = Vec::new(); @@ -4624,7 +4624,7 @@ impl Timeline { self: &Arc, tenant: &crate::tenant::Tenant, options: detach_ancestor::Options, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result< ( completion::Completion, @@ -4644,7 +4644,7 @@ impl Timeline { self: &Arc, tenant: &crate::tenant::Tenant, prepared: detach_ancestor::PreparedTimelineDetach, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, anyhow::Error> { detach_ancestor::complete(self, tenant, prepared, ctx).await } @@ -4822,7 +4822,7 @@ impl Timeline { cutoff_horizon: Lsn, pitr: Duration, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let _timer = self .metrics @@ -5469,7 +5469,7 @@ impl Timeline { lsn: Lsn, mut images: Vec<(Key, Bytes)>, check_start_lsn: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let last_record_lsn = self.get_last_record_lsn(); assert!( @@ -5513,7 +5513,7 @@ impl Timeline { self: &Arc, mut deltas: Vec<(Key, Lsn, Value)>, check_start_lsn: Option, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let last_record_lsn = self.get_last_record_lsn(); deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); @@ -5556,7 +5556,7 @@ impl Timeline { pub(crate) async fn inspect_image_layers( self: &Arc, lsn: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let mut all_data = Vec::new(); let guard = self.layers.read().await; @@ -5664,7 +5664,7 @@ impl<'a> TimelineWriter<'a> { key: Key, lsn: Lsn, value: &Value, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Avoid doing allocations for "small" values. // In the regression test suite, the limit of 256 avoided allocations in 95% of cases: @@ -5697,7 +5697,7 @@ impl<'a> TimelineWriter<'a> { &mut self, at: Lsn, action: OpenLayerAction, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<&Arc> { match action { OpenLayerAction::Roll => { @@ -5714,7 +5714,7 @@ impl<'a> TimelineWriter<'a> { Ok(&self.write_guard.as_ref().unwrap().open_layer) } - async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> { + async fn open_layer(&mut self, at: Lsn, ctx: &mut RequestContext) -> anyhow::Result<()> { let layer = self.tl.get_layer_for_write(at, ctx).await?; let initial_size = layer.size().await?; @@ -5800,7 +5800,7 @@ impl<'a> TimelineWriter<'a> { pub(crate) async fn put_batch( &mut self, batch: VecMap, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { for (lsn, (key, val)) in batch { self.put(key, lsn, &val, ctx).await? @@ -5812,7 +5812,7 @@ impl<'a> TimelineWriter<'a> { pub(crate) async fn delete_batch( &mut self, batch: &[(Range, Lsn)], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { if let Some((_, lsn)) = batch.first() { let action = self.get_open_layer_action(*lsn, 0); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8a95029f33..ed367ccef4 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -49,7 +49,7 @@ impl Timeline { self: &Arc, _cancel: &CancellationToken, flags: EnumSet, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CompactionError> { // High level strategy for compaction / image creation: // @@ -175,7 +175,7 @@ impl Timeline { async fn compact_shard_ancestors( self: &Arc, rewrite_max: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let mut drop_layers = Vec::new(); let mut layers_to_rewrite: Vec = Vec::new(); @@ -359,7 +359,7 @@ impl Timeline { async fn compact_level0( self: &Arc, target_file_size: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CompactionError> { let CompactLevel0Phase1Result { new_layers, @@ -400,7 +400,7 @@ impl Timeline { guard: tokio::sync::OwnedRwLockReadGuard, mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_acquisition_micros.till_now(); // set by caller @@ -907,7 +907,7 @@ impl Timeline { pub(crate) async fn compact_tiered( self: &Arc, _cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CompactionError> { let fanout = self.get_compaction_threshold() as u64; let target_file_size = self.get_checkpoint_distance(); @@ -963,7 +963,7 @@ impl Timeline { pub(crate) async fn compact_with_gc( self: &Arc, _cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CompactionError> { use crate::tenant::storage_layer::ValueReconstructState; // Step 0: pick all delta layers + image layers below/intersect with the GC horizon. @@ -1190,7 +1190,7 @@ impl CompactionJobExecutor for TimelineAdaptor { &mut self, key_range: &Range, lsn_range: &Range, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result>> { self.flush_updates().await?; @@ -1211,7 +1211,7 @@ impl CompactionJobExecutor for TimelineAdaptor { &mut self, key_range: &Range, lsn: Lsn, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result>> { if lsn == self.keyspace.0 { Ok(pageserver_compaction::helpers::intersect_keyspace( @@ -1247,7 +1247,7 @@ impl CompactionJobExecutor for TimelineAdaptor { &mut self, lsn: Lsn, key_range: &Range, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { Ok(self.create_image_impl(lsn, key_range, ctx).await?) } @@ -1257,7 +1257,7 @@ impl CompactionJobExecutor for TimelineAdaptor { lsn_range: &Range, key_range: &Range, input_layers: &[ResidentDeltaLayer], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); @@ -1329,7 +1329,7 @@ impl CompactionJobExecutor for TimelineAdaptor { async fn delete_layer( &mut self, layer: &OwnArc, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> anyhow::Result<()> { self.layers_to_delete.push(layer.clone().0); Ok(()) @@ -1341,7 +1341,7 @@ impl TimelineAdaptor { &mut self, lsn: Lsn, key_range: &Range, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), CreateImageLayersError> { let timer = self.timeline.metrics.create_images_time_histo.start_timer(); @@ -1468,7 +1468,7 @@ impl CompactionLayer for ResidentDeltaLayer { impl CompactionDeltaLayer for ResidentDeltaLayer { type DeltaEntry<'a> = DeltaEntry<'a>; - async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result>> { + async fn load_keys<'a>(&self, ctx: &mut RequestContext) -> anyhow::Result>> { self.0.load_keys(ctx).await } } diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 4fc89330ba..64e55623bc 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -87,7 +87,7 @@ pub(super) async fn prepare( detached: &Arc, tenant: &Tenant, options: Options, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> { use Error::*; @@ -325,7 +325,7 @@ async fn upload_rewritten_layer( layer: &Layer, target: &Arc, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, Error> { use Error::UploadRewritten; let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?; @@ -348,7 +348,7 @@ async fn copy_lsn_prefix( end_lsn: Lsn, layer: &Layer, target_timeline: &Arc, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, Error> { use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed}; @@ -437,7 +437,7 @@ pub(super) async fn complete( detached: &Arc, tenant: &Tenant, prepared: PreparedTimelineDetach, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> Result, anyhow::Error> { let PreparedTimelineDetach { layers } = prepared; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 8a8c38d0ce..7e16911929 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -127,7 +127,7 @@ impl Timeline { policy: &EvictionPolicy, cancel: &CancellationToken, gate: &GateGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> ControlFlow<(), Instant> { debug!("eviction iteration: {policy:?}"); let start = Instant::now(); @@ -184,7 +184,7 @@ impl Timeline { p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, gate: &GateGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> ControlFlow<()> { let now = SystemTime::now(); @@ -309,7 +309,7 @@ impl Timeline { p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, gate: &GateGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> ControlFlow<()> { let permit = self.acquire_imitation_permit(cancel, ctx).await?; @@ -320,7 +320,7 @@ impl Timeline { async fn acquire_imitation_permit( &self, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> { let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( BackgroundLoopKind::Eviction, @@ -366,7 +366,7 @@ impl Timeline { cancel: &CancellationToken, gate: &GateGuard, permit: tokio::sync::SemaphorePermit<'static>, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> ControlFlow<()> { if !self.tenant_shard_id.is_shard_zero() { // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size @@ -442,7 +442,7 @@ impl Timeline { async fn imitate_timeline_cached_layer_accesses( &self, guard: &GateGuard, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { let lsn = self.get_last_record_lsn(); @@ -499,7 +499,7 @@ impl Timeline { &self, tenant: &Tenant, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &mut RequestContext, ) { if self.conf.metric_collection_endpoint.is_none() { // We don't start the consumption metrics task if this is not set in the config. diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 550a9a567a..8a908ed7f9 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -73,7 +73,7 @@ impl LayerManager { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result> { ensure!(lsn.is_aligned()); diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index 2b60e670ea..909cb84d82 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -90,7 +90,7 @@ impl<'t> UninitializedTimeline<'t> { copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, broker_client: storage_broker::BrokerClientChannel, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index a085154a5a..a70a1540df 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -68,7 +68,7 @@ impl WalReceiver { timeline: Arc, conf: WalReceiverConf, mut broker_client: BrokerClientChannel, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Self { let tenant_shard_id = timeline.tenant_shard_id; let timeline_id = timeline.timeline_id; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 1d2ffec08f..1722acc1c9 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -59,7 +59,7 @@ pub(crate) struct Cancelled; pub(super) async fn connection_manager_loop_step( broker_client: &mut BrokerClientChannel, connection_manager_state: &mut ConnectionManagerState, - ctx: &RequestContext, + ctx: &mut RequestContext, cancel: &CancellationToken, manager_status: &std::sync::RwLock>, ) -> Result<(), Cancelled> { @@ -523,7 +523,7 @@ impl ConnectionManagerState { } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) { + async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &mut RequestContext) { WALRECEIVER_SWITCHES .with_label_values(&[new_sk.reason.name()]) .inc(); diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 6e825760e3..3e0d4749d4 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -286,7 +286,7 @@ impl<'a> VectoredBlobReader<'a> { &self, read: &VectoredRead, buf: BytesMut, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { assert!(read.size() > 0); assert!( diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 04d9386fab..f8419b630d 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -346,7 +346,7 @@ impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open>( path: P, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await } @@ -355,7 +355,7 @@ impl VirtualFile { /// Like File::create. pub async fn create>( path: P, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { Self::open_with_options( path.as_ref(), @@ -373,7 +373,7 @@ impl VirtualFile { pub async fn open_with_options>( path: P, open_options: &OpenOptions, - _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ + _ctx: &mut RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ ) -> Result { let path_ref = path.as_ref(); let path_str = path_ref.to_string(); @@ -589,7 +589,7 @@ impl VirtualFile { &self, buf: B, offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result where B: IoBufMut + Send, @@ -606,7 +606,7 @@ impl VirtualFile { buf: B, offset: u64, count: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result where B: IoBufMut + Send, @@ -623,7 +623,7 @@ impl VirtualFile { &self, page: PageWriteGuard<'static>, offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, Error> { let buf = PageWriteGuardBuf { page, @@ -639,7 +639,7 @@ impl VirtualFile { &self, buf: B, mut offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (B::Buf, Result<(), Error>) { let buf_len = buf.bytes_init(); if buf_len == 0 { @@ -677,7 +677,7 @@ impl VirtualFile { pub async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (B::Buf, Result) { let nbytes = buf.bytes_init(); if nbytes == 0 { @@ -710,7 +710,7 @@ impl VirtualFile { async fn write( &mut self, buf: Slice, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> (Slice, Result) { let pos = self.pos; let (buf, res) = self.write_at(buf, pos, ctx).await; @@ -726,7 +726,7 @@ impl VirtualFile { &self, buf: B, offset: u64, - _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + _ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ ) -> (B, Result) where B: tokio_epoll_uring::BoundedBufMut + Send, @@ -756,7 +756,7 @@ impl VirtualFile { &self, buf: Slice, offset: u64, - _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + _ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ ) -> (Slice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -1048,7 +1048,7 @@ impl VirtualFile { pub(crate) async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; let buf = vec![0; PAGE_SZ]; @@ -1058,7 +1058,7 @@ impl VirtualFile { Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &mut RequestContext) -> Result<(), Error> { let mut tmp = vec![0; 128]; loop { let res; @@ -1122,7 +1122,7 @@ impl OwnedAsyncWriter for VirtualFile { async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)> { let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; res.map(move |v| (v, buf)) @@ -1208,7 +1208,7 @@ mod tests { &self, mut buf: Vec, offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await, @@ -1219,7 +1219,7 @@ mod tests { &self, buf: B, offset: u64, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { @@ -1244,7 +1244,7 @@ mod tests { async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { @@ -1263,7 +1263,7 @@ mod tests { // Helper function to slurp contents of a file, starting at the current position, // into a string - async fn read_string(&mut self, ctx: &RequestContext) -> Result { + async fn read_string(&mut self, ctx: &mut RequestContext) -> Result { use std::io::Read; let mut buf = String::new(); match self { @@ -1284,7 +1284,7 @@ mod tests { &mut self, pos: u64, len: usize, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let buf = vec![0; len]; let buf = self.read_exact_at(buf, pos, ctx).await?; @@ -1307,7 +1307,7 @@ mod tests { async fn open( path: Utf8PathBuf, opts: OpenOptions, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result { let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) @@ -1324,7 +1324,7 @@ mod tests { async fn open( path: Utf8PathBuf, opts: OpenOptions, - _ctx: &RequestContext, + _ctx: &mut RequestContext, ) -> Result { Ok(MaybeVirtualFile::File({ let owned_fd = opts.open(path.as_std_path()).await?; @@ -1343,7 +1343,7 @@ mod tests { async fn open( path: Utf8PathBuf, opts: OpenOptions, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result; } diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 55b1d0b46b..ab7cee0609 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -38,7 +38,7 @@ where async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)> { let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; self.bytes_amount += u64::try_from(nwritten).unwrap(); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 885a9221c5..918e786959 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -9,7 +9,7 @@ pub trait OwnedAsyncWriter { async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)>; } @@ -60,7 +60,7 @@ where } #[cfg_attr(target_os = "macos", allow(dead_code))] - pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result { + pub async fn flush_and_into_inner(mut self, ctx: &mut RequestContext) -> std::io::Result { self.flush(ctx).await?; let Self { buf, writer } = self; @@ -79,7 +79,7 @@ where pub async fn write_buffered( &mut self, chunk: Slice, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result<(usize, S)> { let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk @@ -124,7 +124,7 @@ where pub async fn write_buffered_borrowed( &mut self, mut chunk: &[u8], - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> std::io::Result { let chunk_len = chunk.len(); while !chunk.is_empty() { @@ -142,7 +142,7 @@ where Ok(chunk_len) } - async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> { + async fn flush(&mut self, ctx: &mut RequestContext) -> std::io::Result<()> { let buf = self.buf.take().expect("must not use after an error"); let buf_len = buf.pending(); if buf_len == 0 { @@ -215,7 +215,7 @@ impl OwnedAsyncWriter for Vec { async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - _: &RequestContext, + _: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)> { let nbytes = buf.bytes_init(); if nbytes == 0 { @@ -243,7 +243,7 @@ mod tests { async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, - _: &RequestContext, + _: &mut RequestContext, ) -> std::io::Result<(usize, B::Buf)> { let nbytes = buf.bytes_init(); if nbytes == 0 { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4f26f2f6d1..b91890a318 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -59,7 +59,7 @@ impl WalIngest { pub async fn new( timeline: &Timeline, startpoint: Lsn, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. @@ -90,7 +90,7 @@ impl WalIngest { lsn: Lsn, modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { WAL_INGEST.records_received.inc(); let pg_version = modification.tline.pg_version; @@ -449,7 +449,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, blk: &DecodedBkpBlock, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), PageReconstructError> { let rel = RelTag { spcnode: blk.rnode_spcnode, @@ -467,7 +467,7 @@ impl WalIngest { lsn: Lsn, decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), PageReconstructError> { let rel = RelTag { spcnode: blk.rnode_spcnode, @@ -530,7 +530,7 @@ impl WalIngest { buf: &mut Bytes, modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Handle VM bit updates that are implicitly part of heap records. @@ -836,7 +836,7 @@ impl WalIngest { buf: &mut Bytes, modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Handle VM bit updates that are implicitly part of heap records. @@ -1007,7 +1007,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, rec: &XlCreateDatabase, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let db_id = rec.db_id; let tablespace_id = rec.tablespace_id; @@ -1102,7 +1102,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, rec: &XlSmgrCreate, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let rel = RelTag { spcnode: rec.rnode.spcnode, @@ -1121,7 +1121,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, rec: &XlSmgrTruncate, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { let spcnode = rec.rnode.spcnode; let dbnode = rec.rnode.dbnode; @@ -1193,7 +1193,7 @@ impl WalIngest { parsed: &XlXactParsedRecord, is_commit: bool, origin_id: u16, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; @@ -1270,7 +1270,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, xlrec: &XlClogTruncate, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { info!( "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}", @@ -1416,7 +1416,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, xlrec: &XlMultiXactTruncate, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { self.checkpoint.oldestMulti = xlrec.end_trunc_off; self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db; @@ -1454,7 +1454,7 @@ impl WalIngest { modification: &mut DatadirModification<'_>, xlrec: &XlRelmapUpdate, decoded: &DecodedWALRecord, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -1475,7 +1475,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, rel: RelTag, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { modification.put_rel_creation(rel, 0, ctx).await?; Ok(()) @@ -1487,7 +1487,7 @@ impl WalIngest { rel: RelTag, blknum: BlockNumber, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), PageReconstructError> { self.handle_rel_extend(modification, rel, blknum, ctx) .await?; @@ -1501,7 +1501,7 @@ impl WalIngest { rel: RelTag, blknum: BlockNumber, rec: NeonWalRecord, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { self.handle_rel_extend(modification, rel, blknum, ctx) .await?; @@ -1514,7 +1514,7 @@ impl WalIngest { modification: &mut DatadirModification<'_>, rel: RelTag, nblocks: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { modification.put_rel_truncation(rel, nblocks, ctx).await?; Ok(()) @@ -1524,7 +1524,7 @@ impl WalIngest { &mut self, modification: &mut DatadirModification<'_>, rel: RelTag, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { modification.put_rel_drop(rel, ctx).await?; Ok(()) @@ -1535,7 +1535,7 @@ impl WalIngest { modification: &mut DatadirModification<'_>, rel: RelTag, blknum: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<(), PageReconstructError> { let new_nblocks = blknum + 1; // Check if the relation exists. We implicitly create relations on first @@ -1597,7 +1597,7 @@ impl WalIngest { segno: u32, blknum: BlockNumber, img: Bytes, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> Result<()> { self.handle_slru_extend(modification, kind, segno, blknum, ctx) .await?; @@ -1611,7 +1611,7 @@ impl WalIngest { kind: SlruKind, segno: u32, blknum: BlockNumber, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result<()> { // we don't use a cache for this like we do for relations. SLRUS are explcitly // extended with ZEROPAGE records, not with commit records, so it happens @@ -1660,7 +1660,7 @@ impl WalIngest { async fn get_relsize( modification: &DatadirModification<'_>, rel: RelTag, - ctx: &RequestContext, + ctx: &mut RequestContext, ) -> anyhow::Result { let nblocks = if !modification .tline @@ -1701,7 +1701,7 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result { + async fn init_walingest_test(tline: &Timeline, ctx: &mut RequestContext) -> Result { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file