auto-replace all &RequestContext to &mut RequestContext (plus a tiny bit of manual changes)

This commit is contained in:
Christian Schwarz
2024-06-20 20:04:31 +00:00
parent f8ac3b0e0e
commit f12f31ae77
46 changed files with 359 additions and 359 deletions

View File

@@ -99,7 +99,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
}
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &mut RequestContext) -> Result<Vec<Hole>> {
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);

View File

@@ -57,7 +57,7 @@ pub(crate) enum LayerCmd {
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &mut RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);

View File

@@ -60,7 +60,7 @@ pub async fn send_basebackup_tarball<'a, W>(
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: &'a RequestContext,
ctx: &'a mut RequestContext,
) -> Result<(), BasebackupError>
where
W: AsyncWrite + Send + Sync + Unpin,
@@ -141,7 +141,7 @@ where
lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
ctx: &'a RequestContext,
ctx: &'a mut RequestContext,
}
/// A sink that accepts SLRU blocks ordered by key and forwards

View File

@@ -51,7 +51,7 @@ pub async fn collect_metrics(
node_id: NodeId,
local_disk_storage: Utf8PathBuf,
cancel: CancellationToken,
ctx: RequestContext,
mut ctx: RequestContext,
) -> anyhow::Result<()> {
if _cached_metric_collection_interval != Duration::ZERO {
tracing::warn!(
@@ -60,7 +60,7 @@ pub async fn collect_metrics(
}
// spin up background worker that caclulates tenant sizes
let worker_ctx =
let mut worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -76,7 +76,7 @@ pub async fn collect_metrics(
tenant_manager,
synthetic_size_calculation_interval,
&cancel,
&worker_ctx,
&mut worker_ctx,
)
.instrument(info_span!("synthetic_size_worker"))
.await?;
@@ -122,7 +122,7 @@ pub async fn collect_metrics(
let started_at = Instant::now();
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &mut ctx).await;
let metrics = Arc::new(metrics);
@@ -280,7 +280,7 @@ async fn calculate_synthetic_size_worker(
tenant_manager: Arc<TenantManager>,
synthetic_size_calculation_interval: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
scopeguard::defer! {
@@ -340,7 +340,7 @@ async fn calculate_synthetic_size_worker(
}
}
async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &mut RequestContext) {
const CAUSE: LogicalSizeCalculationCause =
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;

View File

@@ -184,7 +184,7 @@ impl MetricsKey {
pub(super) async fn collect_all_metrics(
tenant_manager: &Arc<TenantManager>,
cached_metrics: &Cache,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Vec<RawMetric> {
use pageserver_api::models::TenantState;
@@ -220,7 +220,7 @@ pub(super) async fn collect_all_metrics(
res
}
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
async fn collect<S>(tenants: S, cache: &Cache, ctx: &mut RequestContext) -> Vec<RawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{
@@ -342,7 +342,7 @@ impl TimelineSnapshot {
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect(
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Option<Self>> {
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values

View File

@@ -158,7 +158,7 @@ impl RequestContextBuilder {
}
}
pub fn extend(original: &RequestContext) -> Self {
pub fn extend(original: &mut RequestContext) -> Self {
Self {
// This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it.

View File

@@ -352,7 +352,7 @@ async fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
force_await_initial_logical_size: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -381,7 +381,7 @@ async fn build_timeline_info(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();

View File

@@ -53,7 +53,7 @@ pub async fn import_timeline_from_postgres_datadir(
tline: &Timeline,
pgdata_path: &Utf8Path,
pgdata_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
@@ -121,7 +121,7 @@ async fn import_rel(
dboid: Oid,
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Does it look like a relation file?
trace!("importing rel file {}", path.display());
@@ -210,7 +210,7 @@ async fn import_slru(
path: &Path,
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
info!("importing slru file {path:?}");
@@ -268,7 +268,7 @@ async fn import_wal(
tline: &Timeline,
startpoint: Lsn,
endpoint: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
@@ -346,7 +346,7 @@ pub async fn import_basebackup_from_tar(
tline: &Timeline,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
info!("importing base at {base_lsn}");
let mut modification = tline.begin_modification(base_lsn);
@@ -397,7 +397,7 @@ pub async fn import_wal_from_tar(
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
start_lsn: Lsn,
end_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
@@ -489,7 +489,7 @@ async fn import_file(
file_path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<ControlFileData>> {
let file_name = match file_path.file_name() {
Some(name) => name.to_string_lossy(),

View File

@@ -333,7 +333,7 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
});
impl PageCacheMetrics {
pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind {
pub(crate) fn for_ctx(&self, ctx: &mut RequestContext) -> &PageCacheMetricsForTaskKind {
&self.map[ctx.task_kind()][ctx.page_content_kind()]
}
}

View File

@@ -331,7 +331,7 @@ impl PageCache {
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ReadBufResult> {
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
.await
@@ -422,7 +422,7 @@ impl PageCache {
async fn lock_for_read(
&self,
cache_key: &CacheKey,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);

View File

@@ -867,7 +867,7 @@ impl PageServerHandler {
request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Lsn, PageStreamError> {
let last_record_lsn = timeline.get_last_record_lsn();
@@ -926,7 +926,7 @@ impl PageServerHandler {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -958,7 +958,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline
@@ -990,7 +990,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
@@ -1023,7 +1023,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
@@ -1173,7 +1173,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self.get_cached_timeline_for_page(req) {
Ok(tl) => {
@@ -1233,7 +1233,7 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
@@ -1275,7 +1275,7 @@ impl PageServerHandler {
prev_lsn: Option<Lsn>,
full_backup: bool,
gzip: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

View File

@@ -188,7 +188,7 @@ impl Timeline {
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
@@ -218,7 +218,7 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
@@ -236,7 +236,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
@@ -272,7 +272,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
@@ -307,7 +307,7 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
@@ -335,7 +335,7 @@ impl Timeline {
kind: SlruKind,
segno: u32,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
@@ -357,7 +357,7 @@ impl Timeline {
segno: u32,
blknum: BlockNumber,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await
@@ -369,7 +369,7 @@ impl Timeline {
kind: SlruKind,
segno: u32,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
@@ -382,7 +382,7 @@ impl Timeline {
kind: SlruKind,
segno: u32,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> {
// fetch directory listing
let key = slru_dir_to_key(kind);
@@ -408,7 +408,7 @@ impl Timeline {
&self,
search_timestamp: TimestampTz,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
pausable_failpoint!("find-lsn-for-timestamp-pausable");
@@ -499,7 +499,7 @@ impl Timeline {
probe_lsn: Lsn,
found_smaller: &mut bool,
found_larger: &mut bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> {
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
if timestamp >= search_timestamp {
@@ -519,7 +519,7 @@ impl Timeline {
pub(crate) async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<TimestampTz>, PageReconstructError> {
let mut max: Option<TimestampTz> = None;
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
@@ -542,7 +542,7 @@ impl Timeline {
async fn map_all_timestamps<T: Default>(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
@@ -575,7 +575,7 @@ impl Timeline {
pub(crate) async fn get_slru_keyspace(
&self,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<KeySpace, PageReconstructError> {
let mut accum = KeySpaceAccum::new();
@@ -604,7 +604,7 @@ impl Timeline {
&self,
kind: SlruKind,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashSet<u32>, PageReconstructError> {
// fetch directory entry
let key = slru_dir_to_key(kind);
@@ -621,7 +621,7 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
let key = relmap_file_key(spcnode, dbnode);
@@ -632,7 +632,7 @@ impl Timeline {
pub(crate) async fn list_dbdirs(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
// fetch directory entry
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
@@ -647,7 +647,7 @@ impl Timeline {
&self,
xid: TransactionId,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
let key = twophase_file_key(xid);
let buf = self.get(key, lsn, ctx).await?;
@@ -657,7 +657,7 @@ impl Timeline {
pub(crate) async fn list_twophase_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashSet<TransactionId>, PageReconstructError> {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
@@ -671,7 +671,7 @@ impl Timeline {
pub(crate) async fn get_control_file(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.get(CONTROLFILE_KEY, lsn, ctx).await
}
@@ -679,7 +679,7 @@ impl Timeline {
pub(crate) async fn get_checkpoint(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
@@ -687,7 +687,7 @@ impl Timeline {
async fn list_aux_files_v1(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
@@ -705,7 +705,7 @@ impl Timeline {
async fn list_aux_files_v2(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
@@ -729,7 +729,7 @@ impl Timeline {
pub(crate) async fn trigger_aux_file_size_computation(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
@@ -741,7 +741,7 @@ impl Timeline {
pub(crate) async fn list_aux_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
@@ -779,7 +779,7 @@ impl Timeline {
pub(crate) async fn get_replorigins(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
@@ -809,7 +809,7 @@ impl Timeline {
pub(crate) async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
@@ -845,7 +845,7 @@ impl Timeline {
pub(crate) async fn collect_keyspace(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
// Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new();
@@ -1145,7 +1145,7 @@ impl<'a> DatadirModification<'a> {
spcnode: Oid,
dbnode: Oid,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
@@ -1182,7 +1182,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
xid: TransactionId,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
@@ -1229,7 +1229,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
spcnode: Oid,
dbnode: Oid,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let total_blocks = self
.tline
@@ -1266,7 +1266,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), RelationError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
@@ -1328,7 +1328,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
@@ -1362,7 +1362,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
@@ -1384,7 +1384,7 @@ impl<'a> DatadirModification<'a> {
}
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &mut RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Remove it from the directory entry
@@ -1420,7 +1420,7 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind,
segno: u32,
nblocks: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory entry
let dir_key = slru_dir_to_key(kind);
@@ -1466,7 +1466,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
kind: SlruKind,
segno: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Remove it from the directory entry
let dir_key = slru_dir_to_key(kind);
@@ -1499,7 +1499,7 @@ impl<'a> DatadirModification<'a> {
pub async fn drop_twophase_file(
&mut self,
xid: TransactionId,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
@@ -1538,7 +1538,7 @@ impl<'a> DatadirModification<'a> {
&mut self,
path: &str,
content: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let switch_policy = self.tline.get_switch_aux_file_policy();
@@ -1731,7 +1731,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn flush(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1777,7 +1777,7 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn commit(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> {
let mut writer = self.tline.writer().await;
let pending_nblocks = self.pending_nblocks;
@@ -1828,7 +1828,7 @@ impl<'a> DatadirModification<'a> {
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
async fn get(&self, key: Key, ctx: &mut RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
@@ -1895,7 +1895,7 @@ impl<'a> Version<'a> {
&self,
timeline: &Timeline,
key: Key,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,

View File

@@ -547,7 +547,7 @@ impl Tenant {
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -656,7 +656,7 @@ impl Tenant {
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
@@ -965,7 +965,7 @@ impl Tenant {
self: &Arc<Tenant>,
preload: Option<TenantPreload>,
mode: SpawnMode,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
@@ -1175,7 +1175,7 @@ impl Tenant {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
resources: TimelineResources,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
@@ -1358,7 +1358,7 @@ impl Tenant {
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<UninitializedTimeline> {
anyhow::ensure!(
self.is_active(),
@@ -1401,7 +1401,7 @@ impl Tenant {
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
@@ -1440,7 +1440,7 @@ impl Tenant {
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
@@ -1477,7 +1477,7 @@ impl Tenant {
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() {
if matches!(self.current_state(), TenantState::Stopping { .. }) {
@@ -1650,7 +1650,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<GcResult, GcError> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
@@ -1682,7 +1682,7 @@ impl Tenant {
async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
@@ -1779,7 +1779,7 @@ impl Tenant {
self: &Arc<Self>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
span::debug_assert_current_span_has_tenant_id();
@@ -2834,7 +2834,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<GcResult, GcError> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
@@ -2894,7 +2894,7 @@ impl Tenant {
pub(crate) async fn refresh_gc_info(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<Arc<Timeline>>, GcError> {
// since this method can now be called at different rates than the configured gc loop, it
// might be that these configuration values get applied faster than what it was previously,
@@ -2915,7 +2915,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<Arc<Timeline>>, GcError> {
// before taking the gc_cs lock, do the heavier weight finding of gc_cutoff points for
// currently visible timelines.
@@ -3053,7 +3053,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
let tl = self
@@ -3071,7 +3071,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
@@ -3108,7 +3108,7 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
.await
@@ -3120,7 +3120,7 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
@@ -3233,7 +3233,7 @@ impl Tenant {
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let create_guard = self.create_timeline_create_guard(timeline_id).unwrap();
self.bootstrap_timeline(
@@ -3305,7 +3305,7 @@ impl Tenant {
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
@@ -3563,7 +3563,7 @@ impl Tenant {
max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
let logical_sizes_at_once = self
.conf
@@ -3603,7 +3603,7 @@ impl Tenant {
&self,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, size::CalculateSyntheticSizeError> {
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
@@ -3756,7 +3756,7 @@ async fn run_initdb(
pub async fn dump_layerfile_from_path(
path: &Utf8Path,
verbose: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
use std::os::unix::fs::FileExt;
@@ -3960,7 +3960,7 @@ pub(crate) mod harness {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) async fn do_try_load(
&self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
@@ -4219,7 +4219,7 @@ mod tests {
async fn make_some_layers(
tline: &Timeline,
start_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let mut lsn = start_lsn;
{
@@ -4707,7 +4707,7 @@ mod tests {
async fn bulk_insert_compact_gc(
tenant: &Tenant,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
lsn: Lsn,
repeat: usize,
key_count: usize,
@@ -4719,7 +4719,7 @@ mod tests {
async fn bulk_insert_maybe_compact_gc(
tenant: &Tenant,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
mut lsn: Lsn,
repeat: usize,
key_count: usize,
@@ -6249,7 +6249,7 @@ mod tests {
tline: &Timeline,
keyspace: &KeySpace,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default();
let res = tline
@@ -6365,7 +6365,7 @@ mod tests {
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
@@ -6461,7 +6461,7 @@ mod tests {
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
@@ -6515,7 +6515,7 @@ mod tests {
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline

View File

@@ -26,7 +26,7 @@ impl<'a> BlockCursor<'a> {
pub async fn read_blob(
&self,
offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
@@ -38,7 +38,7 @@ impl<'a> BlockCursor<'a> {
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
@@ -130,7 +130,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
@@ -143,7 +143,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
pub async fn flush_buffer(&mut self, ctx: &mut RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf, ctx).await;
res?;
@@ -166,7 +166,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
@@ -218,7 +218,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -267,7 +267,7 @@ impl BlobWriter<true> {
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
pub async fn into_inner(mut self, ctx: &mut RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
Ok(self.inner)
}

View File

@@ -92,7 +92,7 @@ impl<'a> BlockReaderRef<'a> {
async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
@@ -150,7 +150,7 @@ impl<'a> BlockCursor<'a> {
pub async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum, ctx).await
}
@@ -177,7 +177,7 @@ impl<'a> FileBlockReader<'a> {
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
@@ -192,7 +192,7 @@ impl<'a> FileBlockReader<'a> {
pub async fn read_blk<'b>(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease<'b>, std::io::Error> {
let cache = page_cache::get();
match cache

View File

@@ -404,7 +404,7 @@ impl DeleteTenantFlow {
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();

View File

@@ -242,7 +242,7 @@ where
///
/// Read the value for given key. Returns the value, or None if it doesn't exist.
///
pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
pub async fn get(&self, search_key: &[u8; L], ctx: &mut RequestContext) -> Result<Option<u64>> {
let mut result: Option<u64> = None;
self.visit(
search_key,
@@ -278,7 +278,7 @@ where
pub fn get_stream_from<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
ctx: &'a mut RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
try_stream! {
let mut stack = Vec::new();
@@ -363,7 +363,7 @@ where
search_key: &[u8; L],
dir: VisitDirection,
mut visitor: V,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<bool>
where
V: FnMut(&[u8], u64) -> bool,

View File

@@ -28,7 +28,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
@@ -68,7 +68,7 @@ impl EphemeralFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease, io::Error> {
self.rw.read_blk(blknum, ctx).await
}
@@ -76,7 +76,7 @@ impl EphemeralFile {
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written();

View File

@@ -38,7 +38,7 @@ impl RW {
pub(crate) async fn write_all_borrowed(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<usize, io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data.
@@ -52,7 +52,7 @@ impl RW {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease, io::Error> {
match self.rw.read_blk(blknum).await? {
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
@@ -138,7 +138,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let buf = buf.slice(..);
let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done

View File

@@ -64,7 +64,7 @@ where
pub async fn write_all_borrowed(
&mut self,
buf: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf, ctx).await
}

View File

@@ -850,7 +850,7 @@ impl LayerMap {
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
println!("Begin dump LayerMap");
println!("open_layer:");

View File

@@ -696,7 +696,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
tenant_path.is_dir(),
@@ -956,7 +956,7 @@ impl TenantManager {
new_location_config: LocationConf,
flush: Option<Duration>,
mut spawn_mode: SpawnMode,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
@@ -1247,7 +1247,7 @@ impl TenantManager {
&self,
tenant_shard_id: TenantShardId,
drop_cache: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else {
@@ -1509,7 +1509,7 @@ impl TenantManager {
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant_shard_id = *tenant.get_tenant_shard_id();
let r = self
@@ -1539,7 +1539,7 @@ impl TenantManager {
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant_shard_id = *tenant.get_tenant_shard_id();
@@ -1994,7 +1994,7 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
struct RevertOnDropSlot(Option<SlotGuard>);
@@ -2229,7 +2229,7 @@ pub(crate) async fn load_tenant(
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
@@ -2837,7 +2837,7 @@ pub(crate) async fn immediate_gc(
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = TENANTS.read().unwrap();

View File

@@ -518,7 +518,7 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata,
local_path: &Utf8Path,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<u64> {
let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin(

View File

@@ -52,7 +52,7 @@ pub async fn download_layer_file<'a>(
layer_metadata: &'a LayerFileMetadata,
local_path: &Utf8Path,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -107,9 +107,9 @@ pub async fn download_layer_file<'a>(
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let work = {
let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
let mut ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
async move {
let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
let timeline_dir = VirtualFile::open(&timeline_path, &mut ctx)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
@@ -140,7 +140,7 @@ async fn download_object<'a>(
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &mut RequestContext,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),

View File

@@ -506,7 +506,7 @@ impl<'a> TenantDownloader<'a> {
}
}
async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
async fn download(&self, ctx: &mut RequestContext) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_id();
// For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
@@ -831,7 +831,7 @@ impl<'a> TenantDownloader<'a> {
&self,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
@@ -978,7 +978,7 @@ impl<'a> TenantDownloader<'a> {
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
layer: HeatMapLayer,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<HeatMapLayer>, UpdateError> {
// Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!(

View File

@@ -148,7 +148,7 @@ pub(super) async fn gather_inputs(
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant.refresh_gc_info(cancel, ctx).await?;
@@ -379,7 +379,7 @@ async fn fill_logical_sizes(
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CalculateSyntheticSizeError> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines

View File

@@ -425,7 +425,7 @@ impl ReadableLayer {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
@@ -574,7 +574,7 @@ impl LayerAccessStats {
});
}
fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
fn record_access(&self, access_kind: LayerAccessKind, ctx: &mut RequestContext) {
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
return;
}

View File

@@ -249,7 +249,7 @@ impl AsLayerDesc for DeltaLayer {
}
impl DeltaLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
self.desc.dump();
if !verbose {
@@ -292,7 +292,7 @@ impl DeltaLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded
@@ -302,7 +302,7 @@ impl DeltaLayer {
.with_context(|| format!("Failed to load delta layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
async fn load_inner(&self, ctx: &mut RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, None, ctx)
@@ -393,7 +393,7 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
@@ -435,7 +435,7 @@ impl DeltaLayerWriterInner {
key: Key,
lsn: Lsn,
val: Value,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
@@ -449,7 +449,7 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Vec<u8>,
will_init: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
let (val, res) = self.blob_writer.write_blob(val, ctx).await;
@@ -476,7 +476,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, timeline, ctx).await;
@@ -493,7 +493,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -603,7 +603,7 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
inner: Some(
@@ -630,7 +630,7 @@ impl DeltaLayerWriter {
key: Key,
lsn: Lsn,
val: Value,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
self.inner
.as_mut()
@@ -645,7 +645,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Vec<u8>,
will_init: bool,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
self.inner
.as_mut()
@@ -665,7 +665,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> {
self.inner
.take()
@@ -704,7 +704,7 @@ impl DeltaLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
@@ -744,7 +744,7 @@ impl DeltaLayerInner {
path: &Utf8Path,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
@@ -793,7 +793,7 @@ impl DeltaLayerInner {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
@@ -824,13 +824,13 @@ impl DeltaLayerInner {
!blob_ref.will_init()
},
&RequestContextBuilder::extend(ctx)
&mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await?;
let ctx = &RequestContextBuilder::extend(ctx)
let ctx = &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
@@ -889,7 +889,7 @@ impl DeltaLayerInner {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -931,7 +931,7 @@ impl DeltaLayerInner {
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -973,12 +973,12 @@ impl DeltaLayerInner {
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
{
let ctx = RequestContextBuilder::extend(ctx)
let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
@@ -986,7 +986,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
let index_stream = index_reader.get_stream_from(&start_key.0, &mut ctx);
let mut index_stream = std::pin::pin!(index_stream);
while let Some(index_entry) = index_stream.next().await {
@@ -1062,7 +1062,7 @@ impl DeltaLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
@@ -1140,7 +1140,7 @@ impl DeltaLayerInner {
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<DeltaEntry<'a>>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -1179,7 +1179,7 @@ impl DeltaLayerInner {
all_keys.push(entry);
true
},
&RequestContextBuilder::extend(ctx)
&mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
@@ -1199,7 +1199,7 @@ impl DeltaLayerInner {
&self,
writer: &mut DeltaLayerWriter,
until: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
@@ -1387,7 +1387,7 @@ impl DeltaLayerInner {
Ok(records)
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
self.index_start_blk, self.index_root_blk
@@ -1404,7 +1404,7 @@ impl DeltaLayerInner {
let keys = self.load_keys(ctx).await?;
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
async fn dump_blob(val: &ValueRef<'_>, ctx: &mut RequestContext) -> anyhow::Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
let desc = match val {
@@ -1513,7 +1513,7 @@ pub struct ValueRef<'a> {
impl<'a> ValueRef<'a> {
/// Loads the value from disk
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
pub async fn load(&self, ctx: &mut RequestContext) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
@@ -1527,7 +1527,7 @@ impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> {
let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
block_reader.read_blk(blknum, ctx).await
@@ -2060,7 +2060,7 @@ mod test {
source: &DeltaLayerInner,
truncated: &DeltaLayerInner,
truncated_at: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
use futures::future::ready;
use futures::stream::TryStreamExt;

View File

@@ -177,7 +177,7 @@ impl std::fmt::Debug for ImageLayerInner {
}
impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk,
@@ -217,7 +217,7 @@ impl AsLayerDesc for ImageLayer {
}
impl ImageLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
self.desc.dump();
if !verbose {
@@ -254,7 +254,7 @@ impl ImageLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx);
self.inner
@@ -263,7 +263,7 @@ impl ImageLayer {
.with_context(|| format!("Failed to load image layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
async fn load_inner(&self, ctx: &mut RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
@@ -336,7 +336,7 @@ impl ImageLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
@@ -377,7 +377,7 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
@@ -428,7 +428,7 @@ impl ImageLayerInner {
&self,
key: Key,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
@@ -439,7 +439,7 @@ impl ImageLayerInner {
if let Some(offset) = tree_reader
.get(
&keybuf,
&RequestContextBuilder::extend(ctx)
&mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
@@ -449,7 +449,7 @@ impl ImageLayerInner {
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
&mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
@@ -470,7 +470,7 @@ impl ImageLayerInner {
&self,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, None, ctx)
@@ -489,7 +489,7 @@ impl ImageLayerInner {
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
@@ -522,7 +522,7 @@ impl ImageLayerInner {
&self,
keyspace: KeySpace,
shard_identity: Option<&ShardIdentity>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes
@@ -535,7 +535,7 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let ctx = RequestContextBuilder::extend(ctx)
let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build();
@@ -544,7 +544,7 @@ impl ImageLayerInner {
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);
let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
let index_stream = tree_reader.get_stream_from(&search_key, &mut ctx);
let mut index_stream = std::pin::pin!(index_stream);
while let Some(index_entry) = index_stream.next().await {
@@ -587,7 +587,7 @@ impl ImageLayerInner {
&self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<usize> {
// Fragment the range into the regions owned by this ShardIdentity
let plan = self
@@ -629,7 +629,7 @@ impl ImageLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
let max_vectored_read_bytes = self
.max_vectored_read_bytes
@@ -724,7 +724,7 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done.
@@ -779,7 +779,7 @@ impl ImageLayerWriterInner {
&mut self,
key: Key,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
@@ -799,7 +799,7 @@ impl ImageLayerWriterInner {
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -899,7 +899,7 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
@@ -918,7 +918,7 @@ impl ImageLayerWriter {
&mut self,
key: Key,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -929,7 +929,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx).await
}

View File

@@ -256,7 +256,7 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
let inner = self.inner.read().await;
let end_str = self.end_lsn_or_max();
@@ -308,12 +308,12 @@ impl InMemoryLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let ctx = RequestContextBuilder::extend(ctx)
let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
@@ -325,7 +325,7 @@ impl InMemoryLayer {
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos, &ctx).await?;
let buf = reader.read_blob(*pos, &mut ctx).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
@@ -365,9 +365,9 @@ impl InMemoryLayer {
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
@@ -410,7 +410,7 @@ impl InMemoryLayer {
continue;
}
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
let buf = reader.read_blob(block_read.block_offset, &mut ctx).await;
if let Err(e) = buf {
reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
@@ -473,7 +473,7 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
@@ -512,7 +512,7 @@ impl InMemoryLayer {
key: Key,
lsn: Lsn,
buf: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
@@ -525,7 +525,7 @@ impl InMemoryLayer {
key: Key,
lsn: Lsn,
buf: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
@@ -534,7 +534,7 @@ impl InMemoryLayer {
.file
.write_blob(
buf,
&RequestContextBuilder::extend(ctx)
&mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
@@ -606,7 +606,7 @@ impl InMemoryLayer {
pub(crate) async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
key_range: Option<Range<Key>>,
) -> Result<Option<ResidentLayer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this

View File

@@ -331,7 +331,7 @@ impl Layer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure;
@@ -361,7 +361,7 @@ impl Layer {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
let layer = self
.0
@@ -392,7 +392,7 @@ impl Layer {
#[cfg(test)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
let layer = self
.0
@@ -479,7 +479,7 @@ impl Layer {
/// Traditional debug dumping facility
#[allow(unused)]
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> anyhow::Result<()> {
self.0.desc.dump();
if verbose {
@@ -898,7 +898,7 @@ impl LayerInner {
async fn get_or_maybe_download(
self: &Arc<Self>,
allow_download: bool,
ctx: Option<&RequestContext>,
ctx: Option<&mut RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let (weak, permit) = {
// get_or_init_detached can:
@@ -988,7 +988,7 @@ impl LayerInner {
return Err(DownloadError::NotFile(ft));
}
if let Some(ctx) = ctx {
if let Some(ref ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -1049,7 +1049,7 @@ impl LayerInner {
self: &Arc<Self>,
timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit,
ctx: RequestContext,
mut ctx: RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1079,7 +1079,7 @@ impl LayerInner {
.await
.unwrap();
let res = this.download_and_init(timeline, permit, &ctx).await;
let res = this.download_and_init(timeline, permit, &mut ctx).await;
if let Err(res) = tx.send(res) {
match res {
@@ -1122,7 +1122,7 @@ impl LayerInner {
self: &Arc<LayerInner>,
timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<DownloadedLayer>> {
let result = timeline
.remote_client
@@ -1662,7 +1662,7 @@ impl DownloadedLayer {
async fn get<'a>(
&'a self,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<&'a LayerKind> {
let init = || async {
assert_eq!(
@@ -1736,7 +1736,7 @@ impl DownloadedLayer {
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*;
@@ -1758,7 +1758,7 @@ impl DownloadedLayer {
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
use LayerKind::*;
@@ -1778,7 +1778,7 @@ impl DownloadedLayer {
async fn load_key_values(
&self,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
use LayerKind::*;
@@ -1788,7 +1788,7 @@ impl DownloadedLayer {
}
}
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &mut RequestContext) -> anyhow::Result<()> {
use LayerKind::*;
match self.get(owner, ctx).await? {
Delta(d) => d.dump(ctx).await?,
@@ -1837,7 +1837,7 @@ impl ResidentLayer {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
use LayerKind::*;
@@ -1866,7 +1866,7 @@ impl ResidentLayer {
&'a self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<usize> {
use LayerKind::*;
@@ -1881,7 +1881,7 @@ impl ResidentLayer {
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
until: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<usize> {
use LayerKind::*;
@@ -1907,7 +1907,7 @@ impl ResidentLayer {
#[cfg(test)]
pub(crate) async fn as_delta(
&self,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {

View File

@@ -73,7 +73,7 @@ static PERMIT_GAUGES: once_cell::sync::Lazy<
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = PERMIT_GAUGES[loop_kind].guard();

View File

@@ -130,7 +130,7 @@ where
self.inner.load().config.steady_rps()
}
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
pub async fn throttle(&self, ctx: &mut RequestContext, key_count: usize) -> Option<Duration> {
let inner = self.inner.load_full(); // clones the `Inner` Arc
if !inner.task_kinds.contains(ctx.task_kind()) {
return None;

View File

@@ -871,7 +871,7 @@ impl Timeline {
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
@@ -946,7 +946,7 @@ impl Timeline {
key: Key,
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
@@ -1004,7 +1004,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn));
@@ -1101,7 +1101,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn));
@@ -1158,7 +1158,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
@@ -1217,7 +1217,7 @@ impl Timeline {
keyspace: KeySpace,
lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
GetKind::Singular
@@ -1274,7 +1274,7 @@ impl Timeline {
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
if keyspace.overlaps(&Key::metadata_key_range()) {
// skip validation for metadata key range
@@ -1444,7 +1444,7 @@ impl Timeline {
&self,
lsn: Lsn,
who_is_waiting: WaitLsnWaiter<'_>,
ctx: &RequestContext, /* Prepare for use by cancellation */
ctx: &mut RequestContext, /* Prepare for use by cancellation */
) -> Result<(), WaitLsnError> {
let state = self.current_state();
if self.cancel.is_cancelled() || matches!(state, TimelineState::Stopping) {
@@ -1540,7 +1540,7 @@ impl Timeline {
&self,
lsn: Lsn,
length: Duration,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<LsnLease> {
let lease = {
let mut gc_info = self.gc_info.write().unwrap();
@@ -1713,7 +1713,7 @@ impl Timeline {
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
@@ -1765,7 +1765,7 @@ impl Timeline {
parent: Arc<crate::tenant::Tenant>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
if self.tenant_shard_id.is_shard_zero() {
// Logical size is only maintained accurately on shard zero.
@@ -1946,7 +1946,7 @@ impl Timeline {
pub(crate) async fn wait_to_become_active(
&self,
_ctx: &RequestContext, // Prepare for use by cancellation
_ctx: &mut RequestContext, // Prepare for use by cancellation
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {
@@ -2448,7 +2448,7 @@ impl Timeline {
/// when the timeline is activated.
fn launch_wal_receiver(
self: &Arc<Self>,
ctx: &RequestContext,
ctx: &mut RequestContext,
broker_client: BrokerClientChannel,
) {
info!(
@@ -2678,7 +2678,7 @@ impl Timeline {
pub(crate) fn get_current_logical_size(
self: &Arc<Self>,
priority: GetLogicalSizePriority,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> logical_size::CurrentLogicalSize {
if !self.tenant_shard_id.is_shard_zero() {
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example
@@ -2745,7 +2745,7 @@ impl Timeline {
current_size
}
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &mut RequestContext) {
let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
// nothing to do for freshly created timelines;
assert_eq!(
@@ -2963,7 +2963,7 @@ impl Timeline {
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
// We should never be calculating logical sizes on shard !=0, because these shards do not have
@@ -3006,7 +3006,7 @@ impl Timeline {
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
_guard: &GateGuard,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
"Calculating logical size for timeline {} at {}",
@@ -3169,7 +3169,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
@@ -3370,7 +3370,7 @@ impl Timeline {
mut keyspace: KeySpace,
request_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> {
let mut timeline_owned: Arc<Timeline>;
let mut timeline = self;
@@ -3477,7 +3477,7 @@ impl Timeline {
mut cont_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
@@ -3585,7 +3585,7 @@ impl Timeline {
async fn get_ready_ancestor_timeline(
&self,
ancestor: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, GetReadyAncestorError> {
// It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait
@@ -3653,7 +3653,7 @@ impl Timeline {
async fn get_layer_for_write(
&self,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard
@@ -3697,7 +3697,7 @@ impl Timeline {
async fn flush_loop(
self: &Arc<Self>,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
info!("started flush loop");
loop {
@@ -3856,7 +3856,7 @@ impl Timeline {
async fn flush_frozen_layer(
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Lsn, FlushLayerError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4085,7 +4085,7 @@ impl Timeline {
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
key_range: Option<Range<Key>>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
@@ -4142,7 +4142,7 @@ impl Timeline {
lsn: Lsn,
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
@@ -4239,7 +4239,7 @@ impl Timeline {
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
img_range: Range<Key>,
start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
@@ -4339,7 +4339,7 @@ impl Timeline {
partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
img_range: Range<Key>,
mode: ImageLayerCreationMode,
start: Key,
@@ -4423,7 +4423,7 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
mode: ImageLayerCreationMode,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();
@@ -4624,7 +4624,7 @@ impl Timeline {
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<
(
completion::Completion,
@@ -4644,7 +4644,7 @@ impl Timeline {
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}
@@ -4822,7 +4822,7 @@ impl Timeline {
cutoff_horizon: Lsn,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<GcCutoffs, PageReconstructError> {
let _timer = self
.metrics
@@ -5469,7 +5469,7 @@ impl Timeline {
lsn: Lsn,
mut images: Vec<(Key, Bytes)>,
check_start_lsn: Option<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
assert!(
@@ -5513,7 +5513,7 @@ impl Timeline {
self: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
check_start_lsn: Option<Lsn>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
@@ -5556,7 +5556,7 @@ impl Timeline {
pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>,
lsn: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
@@ -5664,7 +5664,7 @@ impl<'a> TimelineWriter<'a> {
key: Key,
lsn: Lsn,
value: &Value,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
@@ -5697,7 +5697,7 @@ impl<'a> TimelineWriter<'a> {
&mut self,
at: Lsn,
action: OpenLayerAction,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action {
OpenLayerAction::Roll => {
@@ -5714,7 +5714,7 @@ impl<'a> TimelineWriter<'a> {
Ok(&self.write_guard.as_ref().unwrap().open_layer)
}
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
async fn open_layer(&mut self, at: Lsn, ctx: &mut RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
let initial_size = layer.size().await?;
@@ -5800,7 +5800,7 @@ impl<'a> TimelineWriter<'a> {
pub(crate) async fn put_batch(
&mut self,
batch: VecMap<Lsn, (Key, Value)>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
for (lsn, (key, val)) in batch {
self.put(key, lsn, &val, ctx).await?
@@ -5812,7 +5812,7 @@ impl<'a> TimelineWriter<'a> {
pub(crate) async fn delete_batch(
&mut self,
batch: &[(Range<Key>, Lsn)],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0);

View File

@@ -49,7 +49,7 @@ impl Timeline {
self: &Arc<Self>,
_cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CompactionError> {
// High level strategy for compaction / image creation:
//
@@ -175,7 +175,7 @@ impl Timeline {
async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
@@ -359,7 +359,7 @@ impl Timeline {
async fn compact_level0(
self: &Arc<Self>,
target_file_size: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CompactionError> {
let CompactLevel0Phase1Result {
new_layers,
@@ -400,7 +400,7 @@ impl Timeline {
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
@@ -907,7 +907,7 @@ impl Timeline {
pub(crate) async fn compact_tiered(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CompactionError> {
let fanout = self.get_compaction_threshold() as u64;
let target_file_size = self.get_checkpoint_distance();
@@ -963,7 +963,7 @@ impl Timeline {
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CompactionError> {
use crate::tenant::storage_layer::ValueReconstructState;
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
@@ -1190,7 +1190,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
@@ -1211,7 +1211,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self,
key_range: &Range<Key>,
lsn: Lsn,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<Vec<Range<Key>>> {
if lsn == self.keyspace.0 {
Ok(pageserver_compaction::helpers::intersect_keyspace(
@@ -1247,7 +1247,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
Ok(self.create_image_impl(lsn, key_range, ctx).await?)
}
@@ -1257,7 +1257,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
lsn_range: &Range<Lsn>,
key_range: &Range<Key>,
input_layers: &[ResidentDeltaLayer],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
@@ -1329,7 +1329,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
async fn delete_layer(
&mut self,
layer: &OwnArc<PersistentLayerDesc>,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> anyhow::Result<()> {
self.layers_to_delete.push(layer.clone().0);
Ok(())
@@ -1341,7 +1341,7 @@ impl TimelineAdaptor {
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), CreateImageLayersError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
@@ -1468,7 +1468,7 @@ impl CompactionLayer<Key> for ResidentDeltaLayer {
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
async fn load_keys<'a>(&self, ctx: &mut RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
self.0.load_keys(ctx).await
}
}

View File

@@ -87,7 +87,7 @@ pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &Tenant,
options: Options,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*;
@@ -325,7 +325,7 @@ async fn upload_rewritten_layer(
layer: &Layer,
target: &Arc<Timeline>,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<Layer>, Error> {
use Error::UploadRewritten;
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
@@ -348,7 +348,7 @@ async fn copy_lsn_prefix(
end_lsn: Lsn,
layer: &Layer,
target_timeline: &Arc<Timeline>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
@@ -437,7 +437,7 @@ pub(super) async fn complete(
detached: &Arc<Timeline>,
tenant: &Tenant,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
let PreparedTimelineDetach { layers } = prepared;

View File

@@ -127,7 +127,7 @@ impl Timeline {
policy: &EvictionPolicy,
cancel: &CancellationToken,
gate: &GateGuard,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> ControlFlow<(), Instant> {
debug!("eviction iteration: {policy:?}");
let start = Instant::now();
@@ -184,7 +184,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> ControlFlow<()> {
let now = SystemTime::now();
@@ -309,7 +309,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> ControlFlow<()> {
let permit = self.acquire_imitation_permit(cancel, ctx).await?;
@@ -320,7 +320,7 @@ impl Timeline {
async fn acquire_imitation_permit(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction,
@@ -366,7 +366,7 @@ impl Timeline {
cancel: &CancellationToken,
gate: &GateGuard,
permit: tokio::sync::SemaphorePermit<'static>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> ControlFlow<()> {
if !self.tenant_shard_id.is_shard_zero() {
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
@@ -442,7 +442,7 @@ impl Timeline {
async fn imitate_timeline_cached_layer_accesses(
&self,
guard: &GateGuard,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
let lsn = self.get_last_record_lsn();
@@ -499,7 +499,7 @@ impl Timeline {
&self,
tenant: &Tenant,
cancel: &CancellationToken,
ctx: &RequestContext,
ctx: &mut RequestContext,
) {
if self.conf.metric_collection_endpoint.is_none() {
// We don't start the consumption metrics task if this is not set in the config.

View File

@@ -73,7 +73,7 @@ impl LayerManager {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());

View File

@@ -90,7 +90,7 @@ impl<'t> UninitializedTimeline<'t> {
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;

View File

@@ -68,7 +68,7 @@ impl WalReceiver {
timeline: Arc<Timeline>,
conf: WalReceiverConf,
mut broker_client: BrokerClientChannel,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Self {
let tenant_shard_id = timeline.tenant_shard_id;
let timeline_id = timeline.timeline_id;

View File

@@ -59,7 +59,7 @@ pub(crate) struct Cancelled;
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
ctx: &mut RequestContext,
cancel: &CancellationToken,
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
) -> Result<(), Cancelled> {
@@ -523,7 +523,7 @@ impl ConnectionManagerState {
}
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &mut RequestContext) {
WALRECEIVER_SWITCHES
.with_label_values(&[new_sk.reason.name()])
.inc();

View File

@@ -286,7 +286,7 @@ impl<'a> VectoredBlobReader<'a> {
&self,
read: &VectoredRead,
buf: BytesMut,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
assert!(

View File

@@ -346,7 +346,7 @@ impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -355,7 +355,7 @@ impl VirtualFile {
/// Like File::create.
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path.as_ref(),
@@ -373,7 +373,7 @@ impl VirtualFile {
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
_ctx: &mut RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
@@ -589,7 +589,7 @@ impl VirtualFile {
&self,
buf: B,
offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
@@ -606,7 +606,7 @@ impl VirtualFile {
buf: B,
offset: u64,
count: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
@@ -623,7 +623,7 @@ impl VirtualFile {
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
@@ -639,7 +639,7 @@ impl VirtualFile {
&self,
buf: B,
mut offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init();
if buf_len == 0 {
@@ -677,7 +677,7 @@ impl VirtualFile {
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
@@ -710,7 +710,7 @@ impl VirtualFile {
async fn write<B: IoBuf + Send>(
&mut self,
buf: Slice<B>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> (Slice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await;
@@ -726,7 +726,7 @@ impl VirtualFile {
&self,
buf: B,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
_ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
@@ -756,7 +756,7 @@ impl VirtualFile {
&self,
buf: Slice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
_ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (Slice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -1048,7 +1048,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let buf = vec![0; PAGE_SZ];
@@ -1058,7 +1058,7 @@ impl VirtualFile {
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &mut RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let res;
@@ -1122,7 +1122,7 @@ impl OwnedAsyncWriter for VirtualFile {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
res.map(move |v| (v, buf))
@@ -1208,7 +1208,7 @@ mod tests {
&self,
mut buf: Vec<u8>,
offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
@@ -1219,7 +1219,7 @@ mod tests {
&self,
buf: B,
offset: u64,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
@@ -1244,7 +1244,7 @@ mod tests {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
@@ -1263,7 +1263,7 @@ mod tests {
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
async fn read_string(&mut self, ctx: &mut RequestContext) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
@@ -1284,7 +1284,7 @@ mod tests {
&mut self,
pos: u64,
len: usize,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<String, Error> {
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos, ctx).await?;
@@ -1307,7 +1307,7 @@ mod tests {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
@@ -1324,7 +1324,7 @@ mod tests {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
_ctx: &RequestContext,
_ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
Ok(MaybeVirtualFile::File({
let owned_fd = opts.open(path.as_std_path()).await?;
@@ -1343,7 +1343,7 @@ mod tests {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error>;
}

View File

@@ -38,7 +38,7 @@ where
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let (nwritten, buf) = self.dst.write_all(buf, ctx).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();

View File

@@ -9,7 +9,7 @@ pub trait OwnedAsyncWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)>;
}
@@ -60,7 +60,7 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<W> {
pub async fn flush_and_into_inner(mut self, ctx: &mut RequestContext) -> std::io::Result<W> {
self.flush(ctx).await?;
let Self { buf, writer } = self;
@@ -79,7 +79,7 @@ where
pub async fn write_buffered<S: IoBuf + Send>(
&mut self,
chunk: Slice<S>,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<(usize, S)> {
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
@@ -124,7 +124,7 @@ where
pub async fn write_buffered_borrowed(
&mut self,
mut chunk: &[u8],
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> std::io::Result<usize> {
let chunk_len = chunk.len();
while !chunk.is_empty() {
@@ -142,7 +142,7 @@ where
Ok(chunk_len)
}
async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> {
async fn flush(&mut self, ctx: &mut RequestContext) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
@@ -215,7 +215,7 @@ impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
_: &RequestContext,
_: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
@@ -243,7 +243,7 @@ mod tests {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
_: &RequestContext,
_: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {

View File

@@ -59,7 +59,7 @@ impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
@@ -90,7 +90,7 @@ impl WalIngest {
lsn: Lsn,
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<bool> {
WAL_INGEST.records_received.inc();
let pg_version = modification.tline.pg_version;
@@ -449,7 +449,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
blk: &DecodedBkpBlock,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
@@ -467,7 +467,7 @@ impl WalIngest {
lsn: Lsn,
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
@@ -530,7 +530,7 @@ impl WalIngest {
buf: &mut Bytes,
modification: &mut DatadirModification<'_>,
decoded: &DecodedWALRecord,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -836,7 +836,7 @@ impl WalIngest {
buf: &mut Bytes,
modification: &mut DatadirModification<'_>,
decoded: &DecodedWALRecord,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -1007,7 +1007,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
rec: &XlCreateDatabase,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let db_id = rec.db_id;
let tablespace_id = rec.tablespace_id;
@@ -1102,7 +1102,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrCreate,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let rel = RelTag {
spcnode: rec.rnode.spcnode,
@@ -1121,7 +1121,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrTruncate,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
let spcnode = rec.rnode.spcnode;
let dbnode = rec.rnode.dbnode;
@@ -1193,7 +1193,7 @@ impl WalIngest {
parsed: &XlXactParsedRecord,
is_commit: bool,
origin_id: u16,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// Record update of CLOG pages
let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
@@ -1270,7 +1270,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
xlrec: &XlClogTruncate,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
info!(
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
@@ -1416,7 +1416,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
@@ -1454,7 +1454,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -1475,7 +1475,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
modification.put_rel_creation(rel, 0, ctx).await?;
Ok(())
@@ -1487,7 +1487,7 @@ impl WalIngest {
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
@@ -1501,7 +1501,7 @@ impl WalIngest {
rel: RelTag,
blknum: BlockNumber,
rec: NeonWalRecord,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
@@ -1514,7 +1514,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
modification.put_rel_truncation(rel, nblocks, ctx).await?;
Ok(())
@@ -1524,7 +1524,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
modification.put_rel_drop(rel, ctx).await?;
Ok(())
@@ -1535,7 +1535,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
rel: RelTag,
blknum: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> {
let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first
@@ -1597,7 +1597,7 @@ impl WalIngest {
segno: u32,
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> Result<()> {
self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?;
@@ -1611,7 +1611,7 @@ impl WalIngest {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
// we don't use a cache for this like we do for relations. SLRUS are explcitly
// extended with ZEROPAGE records, not with commit records, so it happens
@@ -1660,7 +1660,7 @@ impl WalIngest {
async fn get_relsize(
modification: &DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
ctx: &mut RequestContext,
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
@@ -1701,7 +1701,7 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
async fn init_walingest_test(tline: &Timeline, ctx: &mut RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file