Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
e68c43c19b DNM: rip out throttling code (the recorder's borrowing of ctx obviously breaks &mut RequestContext passing) 2024-06-21 11:41:32 +00:00
Christian Schwarz
b2830a48dd some easy mechanical fixes (add let mut and &mut ctx) 2024-06-21 11:25:45 +00:00
Christian Schwarz
8b482a8be0 download.rs: fix error 'captured variable cannot escape FnMut closure body" 2024-06-21 11:19:55 +00:00
Christian Schwarz
81f13e17ce layer.rs: fix error 'captured variable cannot escape FnMut closure body" 2024-06-21 11:19:55 +00:00
Christian Schwarz
ba13f2a90a async closure workaround for virtual file 2024-06-21 09:45:06 +00:00
Christian Schwarz
630c8a9b86 cargo fmt initial 2024-06-21 09:45:06 +00:00
Christian Schwarz
f12f31ae77 auto-replace all &RequestContext to &mut RequestContext (plus a tiny bit of manual changes) 2024-06-21 09:45:06 +00:00
47 changed files with 535 additions and 527 deletions

View File

@@ -37,6 +37,20 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
} }
} }
pub trait Op<T, E> {
async fn call(&mut self) -> Result<T, E>;
}
impl<T, E, F, Fut> Op<T, E> for F
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
async fn call(&mut self) -> Result<T, E> {
(&mut *self)().await
}
}
/// Retries passed operation until one of the following conditions are met: /// Retries passed operation until one of the following conditions are met:
/// - encountered error is considered as permanent (non-retryable) /// - encountered error is considered as permanent (non-retryable)
/// - retries have been exhausted /// - retries have been exhausted
@@ -51,8 +65,8 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
/// for any other error type. Final failed attempt is logged with `{:?}`. /// for any other error type. Final failed attempt is logged with `{:?}`.
/// ///
/// Returns `None` if cancellation was noticed during backoff or the terminal result. /// Returns `None` if cancellation was noticed during backoff or the terminal result.
pub async fn retry<T, O, F, E>( pub async fn retry<T, E>(
mut op: O, mut op: impl Op<T, E>,
is_permanent: impl Fn(&E) -> bool, is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32, warn_threshold: u32,
max_retries: u32, max_retries: u32,
@@ -63,8 +77,6 @@ where
// Not std::error::Error because anyhow::Error doesnt implement it. // Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63 // For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug + 'static, E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{ {
let mut attempts = 0; let mut attempts = 0;
loop { loop {
@@ -72,7 +84,7 @@ where
return None; return None;
} }
let result = op().await; let result = op.call().await;
match &result { match &result {
Ok(_) => { Ok(_) => {
if attempts > 0 { if attempts > 0 {

View File

@@ -99,7 +99,11 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
} }
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> { async fn get_holes(
path: &Utf8Path,
max_holes: usize,
ctx: &mut RequestContext,
) -> Result<Vec<Hole>> {
let file = VirtualFile::open(path, ctx).await?; let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id(); let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id); let block_reader = FileBlockReader::new(&file, file_id);

View File

@@ -57,7 +57,7 @@ pub(crate) enum LayerCmd {
}, },
} }
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> { async fn read_delta_file(path: impl AsRef<Path>, ctx: &mut RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100); page_cache::init(100);

View File

@@ -60,7 +60,7 @@ pub async fn send_basebackup_tarball<'a, W>(
req_lsn: Option<Lsn>, req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>, prev_lsn: Option<Lsn>,
full_backup: bool, full_backup: bool,
ctx: &'a RequestContext, ctx: &'a mut RequestContext,
) -> Result<(), BasebackupError> ) -> Result<(), BasebackupError>
where where
W: AsyncWrite + Send + Sync + Unpin, W: AsyncWrite + Send + Sync + Unpin,
@@ -141,7 +141,7 @@ where
lsn: Lsn, lsn: Lsn,
prev_record_lsn: Lsn, prev_record_lsn: Lsn,
full_backup: bool, full_backup: bool,
ctx: &'a RequestContext, ctx: &'a mut RequestContext,
} }
/// A sink that accepts SLRU blocks ordered by key and forwards /// A sink that accepts SLRU blocks ordered by key and forwards

View File

@@ -51,7 +51,7 @@ pub async fn collect_metrics(
node_id: NodeId, node_id: NodeId,
local_disk_storage: Utf8PathBuf, local_disk_storage: Utf8PathBuf,
cancel: CancellationToken, cancel: CancellationToken,
ctx: RequestContext, mut ctx: RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if _cached_metric_collection_interval != Duration::ZERO { if _cached_metric_collection_interval != Duration::ZERO {
tracing::warn!( tracing::warn!(
@@ -60,7 +60,7 @@ pub async fn collect_metrics(
} }
// spin up background worker that caclulates tenant sizes // spin up background worker that caclulates tenant sizes
let worker_ctx = let mut worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn( task_mgr::spawn(
BACKGROUND_RUNTIME.handle(), BACKGROUND_RUNTIME.handle(),
@@ -76,7 +76,7 @@ pub async fn collect_metrics(
tenant_manager, tenant_manager,
synthetic_size_calculation_interval, synthetic_size_calculation_interval,
&cancel, &cancel,
&worker_ctx, &mut worker_ctx,
) )
.instrument(info_span!("synthetic_size_worker")) .instrument(info_span!("synthetic_size_worker"))
.await?; .await?;
@@ -122,7 +122,8 @@ pub async fn collect_metrics(
let started_at = Instant::now(); let started_at = Instant::now();
// these are point in time, with variable "now" // these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await; let metrics =
metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &mut ctx).await;
let metrics = Arc::new(metrics); let metrics = Arc::new(metrics);
@@ -280,7 +281,7 @@ async fn calculate_synthetic_size_worker(
tenant_manager: Arc<TenantManager>, tenant_manager: Arc<TenantManager>,
synthetic_size_calculation_interval: Duration, synthetic_size_calculation_interval: Duration,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker"); info!("starting calculate_synthetic_size_worker");
scopeguard::defer! { scopeguard::defer! {
@@ -340,7 +341,7 @@ async fn calculate_synthetic_size_worker(
} }
} }
async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) { async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &mut RequestContext) {
const CAUSE: LogicalSizeCalculationCause = const CAUSE: LogicalSizeCalculationCause =
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;

View File

@@ -184,7 +184,7 @@ impl MetricsKey {
pub(super) async fn collect_all_metrics( pub(super) async fn collect_all_metrics(
tenant_manager: &Arc<TenantManager>, tenant_manager: &Arc<TenantManager>,
cached_metrics: &Cache, cached_metrics: &Cache,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Vec<RawMetric> { ) -> Vec<RawMetric> {
use pageserver_api::models::TenantState; use pageserver_api::models::TenantState;
@@ -220,7 +220,7 @@ pub(super) async fn collect_all_metrics(
res res
} }
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric> async fn collect<S>(tenants: S, cache: &Cache, ctx: &mut RequestContext) -> Vec<RawMetric>
where where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>, S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{ {
@@ -342,7 +342,7 @@ impl TimelineSnapshot {
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect( fn collect(
t: &Arc<crate::tenant::Timeline>, t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Option<Self>> { ) -> anyhow::Result<Option<Self>> {
if !t.is_active() { if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values // no collection for broken or stopping needed, we will still keep the cached values

View File

@@ -97,9 +97,10 @@ pub struct RequestContext {
download_behavior: DownloadBehavior, download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior, access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind, page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
} }
pub(crate) struct MicrosSpentThrottled(optional_counter::MicroSecondsCounterU32);
/// The kind of access to the page cache. /// The kind of access to the page cache.
#[derive(Clone, Copy, PartialEq, Eq, Debug, enum_map::Enum, strum_macros::IntoStaticStr)] #[derive(Clone, Copy, PartialEq, Eq, Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
pub enum PageContentKind { pub enum PageContentKind {
@@ -158,7 +159,7 @@ impl RequestContextBuilder {
} }
} }
pub fn extend(original: &RequestContext) -> Self { pub fn extend(original: &mut RequestContext) -> Self {
Self { Self {
// This is like a Copy, but avoid implementing Copy because ordinary users of // This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it. // RequestContext should always move or ref it.

View File

@@ -352,7 +352,7 @@ async fn build_timeline_info(
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool, include_non_incremental_logical_size: bool,
force_await_initial_logical_size: bool, force_await_initial_logical_size: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<TimelineInfo> { ) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -381,7 +381,7 @@ async fn build_timeline_info(
async fn build_timeline_info_common( async fn build_timeline_info_common(
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
logical_size_task_priority: tenant::timeline::GetLogicalSizePriority, logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
) -> anyhow::Result<TimelineInfo> { ) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -502,7 +502,7 @@ async fn timeline_create_handler(
let new_timeline_id = request_data.new_timeline_id; let new_timeline_id = request_data.new_timeline_id;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
let state = get_state(&request); let state = get_state(&request);
@@ -527,7 +527,7 @@ async fn timeline_create_handler(
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION), request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
request_data.existing_initdb_timeline_id, request_data.existing_initdb_timeline_id,
state.broker_client.clone(), state.broker_client.clone(),
&ctx, &mut ctx,
) )
.await .await
{ {
@@ -535,7 +535,7 @@ async fn timeline_create_handler(
// Created. Construct a TimelineInfo for it. // Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common( let timeline_info = build_timeline_info_common(
&new_timeline, &new_timeline,
&ctx, &mut ctx,
tenant::timeline::GetLogicalSizePriority::User, tenant::timeline::GetLogicalSizePriority::User,
) )
.await .await
@@ -593,7 +593,7 @@ async fn timeline_list_handler(
check_permission(&request, Some(tenant_shard_id.tenant_id))?; check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request); let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async { let response_data = async {
let tenant = state let tenant = state
@@ -610,7 +610,7 @@ async fn timeline_list_handler(
&timeline, &timeline,
include_non_incremental_logical_size.unwrap_or(false), include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false),
&ctx, &mut ctx,
) )
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await .await
@@ -680,7 +680,7 @@ async fn timeline_detail_handler(
check_permission(&request, Some(tenant_shard_id.tenant_id))?; check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// Logical size calculation needs downloading. // Logical size calculation needs downloading.
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let state = get_state(&request); let state = get_state(&request);
let timeline_info = async { let timeline_info = async {
@@ -696,7 +696,7 @@ async fn timeline_detail_handler(
&timeline, &timeline,
include_non_incremental_logical_size.unwrap_or(false), include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false),
&ctx, &mut ctx,
) )
.await .await
.context("get local timeline info") .context("get local timeline info")
@@ -735,13 +735,13 @@ async fn get_lsn_by_timestamp_handler(
.map_err(ApiError::BadRequest)?; .map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp); let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?; .await?;
let result = timeline let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) .find_lsn_for_timestamp(timestamp_pg, &cancel, &mut ctx)
.await?; .await?;
#[derive(serde::Serialize, Debug)] #[derive(serde::Serialize, Debug)]
struct Result { struct Result {
@@ -786,11 +786,11 @@ async fn get_timestamp_of_lsn_handler(
.with_context(|| format!("Invalid LSN: {lsn_str:?}")) .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
.map_err(ApiError::BadRequest)?; .map_err(ApiError::BadRequest)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?; .await?;
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; let result = timeline.get_timestamp_for_lsn(lsn, &mut ctx).await?;
match result { match result {
Some(time) => { Some(time) => {
@@ -816,7 +816,7 @@ async fn tenant_attach_handler(
None => TenantConfOpt::default(), None => TenantConfOpt::default(),
}; };
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
info!("Handling tenant attach {tenant_id}"); info!("Handling tenant attach {tenant_id}");
@@ -830,7 +830,7 @@ async fn tenant_attach_handler(
let tenant = state let tenant = state
.tenant_manager .tenant_manager
.upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &ctx) .upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &mut ctx)
.await?; .await?;
let Some(tenant) = tenant else { let Some(tenant) = tenant else {
@@ -921,11 +921,11 @@ async fn tenant_reset_handler(
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?; let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request); let state = get_state(&request);
state state
.tenant_manager .tenant_manager
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx) .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &mut ctx)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -939,7 +939,7 @@ async fn tenant_load_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?; check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?; let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
@@ -956,7 +956,7 @@ async fn tenant_load_handler(
state.broker_client.clone(), state.broker_client.clone(),
state.remote_storage.clone(), state.remote_storage.clone(),
state.deletion_queue_client.clone(), state.deletion_queue_client.clone(),
&ctx, &mut ctx,
) )
.instrument(info_span!("load", %tenant_id)) .instrument(info_span!("load", %tenant_id))
.await?; .await?;
@@ -1120,7 +1120,7 @@ async fn tenant_size_handler(
))); )));
} }
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = state let tenant = state
.tenant_manager .tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?; .get_attached_tenant_shard(tenant_shard_id)?;
@@ -1132,7 +1132,7 @@ async fn tenant_size_handler(
retention_period, retention_period,
LogicalSizeCalculationCause::TenantSizeHandler, LogicalSizeCalculationCause::TenantSizeHandler,
&cancel, &cancel,
&ctx, &mut ctx,
) )
.await .await
.map_err(|e| match e { .map_err(|e| match e {
@@ -1193,7 +1193,7 @@ async fn tenant_shard_split_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let state = get_state(&request); let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let tenant = state let tenant = state
.tenant_manager .tenant_manager
@@ -1206,7 +1206,7 @@ async fn tenant_shard_split_handler(
tenant, tenant,
ShardCount::new(req.new_shard_count), ShardCount::new(req.new_shard_count),
req.new_stripe_size, req.new_stripe_size,
&ctx, &mut ctx,
) )
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -1386,7 +1386,7 @@ async fn tenant_create_handler(
let generation = get_request_generation(state, request_data.generation)?; let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let location_conf = let location_conf =
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters); LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
@@ -1398,7 +1398,7 @@ async fn tenant_create_handler(
location_conf, location_conf,
None, None,
SpawnMode::Create, SpawnMode::Create,
&ctx, &mut ctx,
) )
.await?; .await?;
@@ -1498,7 +1498,7 @@ async fn put_tenant_location_config_handler(
let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false); let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false);
check_permission(&request, Some(tenant_shard_id.tenant_id))?; check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request); let state = get_state(&request);
let conf = state.conf; let conf = state.conf;
@@ -1537,7 +1537,7 @@ async fn put_tenant_location_config_handler(
let tenant = state let tenant = state
.tenant_manager .tenant_manager
.upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx) .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &mut ctx)
.await?; .await?;
let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size()); let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size());
let attached = tenant.is_some(); let attached = tenant.is_some();
@@ -1722,7 +1722,7 @@ async fn lsn_lease_handler(
let lsn: Lsn = parse_query_param(&request, "lsn")? let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?; .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let state = get_state(&request); let state = get_state(&request);
@@ -1730,7 +1730,7 @@ async fn lsn_lease_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?; .await?;
let result = timeline let result = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &mut ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?; .map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
json_response(StatusCode::OK, result) json_response(StatusCode::OK, result)
@@ -1747,8 +1747,8 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?; let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?; let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &mut ctx).await?;
json_response(StatusCode::OK, gc_result) json_response(StatusCode::OK, gc_result)
} }
@@ -1775,10 +1775,10 @@ async fn timeline_compact_handler(
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async { async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline timeline
.compact(&cancel, flags, &ctx) .compact(&cancel, flags, &mut ctx)
.await .await
.map_err(|e| ApiError::InternalServerError(e.into()))?; .map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded { if wait_until_uploaded {
@@ -1812,7 +1812,7 @@ async fn timeline_checkpoint_handler(
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async { async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline timeline
.freeze_and_flush() .freeze_and_flush()
@@ -1825,7 +1825,7 @@ async fn timeline_checkpoint_handler(
} }
})?; })?;
timeline timeline
.compact(&cancel, flags, &ctx) .compact(&cancel, flags, &mut ctx)
.await .await
.map_err(|e| .map_err(|e|
match e { match e {
@@ -1918,8 +1918,8 @@ async fn timeline_detach_ancestor_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
let ctx = &ctx; let ctx = &mut ctx;
let timeline = tenant.get_timeline(timeline_id, true)?; let timeline = tenant.get_timeline(timeline_id, true)?;
@@ -2003,10 +2003,10 @@ async fn getpage_at_lsn_handler(
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?; .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
async { async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
let page = timeline.get(key.0, lsn, &ctx).await?; let page = timeline.get(key.0, lsn, &mut ctx).await?;
Result::<_, ApiError>::Ok( Result::<_, ApiError>::Ok(
Response::builder() Response::builder()
@@ -2032,11 +2032,11 @@ async fn timeline_collect_keyspace(
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?; let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
async { async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let (dense_ks, sparse_ks) = timeline let (dense_ks, sparse_ks) = timeline
.collect_keyspace(at_lsn, &ctx) .collect_keyspace(at_lsn, &mut ctx)
.await .await
.map_err(|e| ApiError::InternalServerError(e.into()))?; .map_err(|e| ApiError::InternalServerError(e.into()))?;
@@ -2425,8 +2425,8 @@ async fn list_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?; .await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?; let files = timeline.list_aux_files(body.lsn, &mut ctx).await?;
json_response(StatusCode::OK, files) json_response(StatusCode::OK, files)
} }
@@ -2467,15 +2467,15 @@ async fn ingest_aux_files(
let mut modification = timeline.begin_modification( let mut modification = timeline.begin_modification(
Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */ Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
); );
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files { for (fname, content) in body.aux_files {
modification modification
.put_file(&fname, content.as_bytes(), &ctx) .put_file(&fname, content.as_bytes(), &mut ctx)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
} }
modification modification
.commit(&ctx) .commit(&mut ctx)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;

View File

@@ -53,7 +53,7 @@ pub async fn import_timeline_from_postgres_datadir(
tline: &Timeline, tline: &Timeline,
pgdata_path: &Utf8Path, pgdata_path: &Utf8Path,
pgdata_lsn: Lsn, pgdata_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None; let mut pg_control: Option<ControlFileData> = None;
@@ -121,7 +121,7 @@ async fn import_rel(
dboid: Oid, dboid: Oid,
reader: &mut (impl AsyncRead + Unpin), reader: &mut (impl AsyncRead + Unpin),
len: usize, len: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Does it look like a relation file? // Does it look like a relation file?
trace!("importing rel file {}", path.display()); trace!("importing rel file {}", path.display());
@@ -210,7 +210,7 @@ async fn import_slru(
path: &Path, path: &Path,
reader: &mut (impl AsyncRead + Unpin), reader: &mut (impl AsyncRead + Unpin),
len: usize, len: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("importing slru file {path:?}"); info!("importing slru file {path:?}");
@@ -268,7 +268,7 @@ async fn import_wal(
tline: &Timeline, tline: &Timeline,
startpoint: Lsn, startpoint: Lsn,
endpoint: Lsn, endpoint: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version); let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
@@ -346,7 +346,7 @@ pub async fn import_basebackup_from_tar(
tline: &Timeline, tline: &Timeline,
reader: &mut (impl AsyncRead + Send + Sync + Unpin), reader: &mut (impl AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn, base_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
info!("importing base at {base_lsn}"); info!("importing base at {base_lsn}");
let mut modification = tline.begin_modification(base_lsn); let mut modification = tline.begin_modification(base_lsn);
@@ -397,7 +397,7 @@ pub async fn import_wal_from_tar(
reader: &mut (impl AsyncRead + Send + Sync + Unpin), reader: &mut (impl AsyncRead + Send + Sync + Unpin),
start_lsn: Lsn, start_lsn: Lsn,
end_lsn: Lsn, end_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
// Set up walingest mutable state // Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version); let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
@@ -489,7 +489,7 @@ async fn import_file(
file_path: &Path, file_path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin), reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize, len: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<ControlFileData>> { ) -> Result<Option<ControlFileData>> {
let file_name = match file_path.file_name() { let file_name = match file_path.file_name() {
Some(name) => name.to_string_lossy(), Some(name) => name.to_string_lossy(),

View File

@@ -333,7 +333,7 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
}); });
impl PageCacheMetrics { impl PageCacheMetrics {
pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind { pub(crate) fn for_ctx(&self, ctx: &mut RequestContext) -> &PageCacheMetricsForTaskKind {
&self.map[ctx.task_kind()][ctx.page_content_kind()] &self.map[ctx.task_kind()][ctx.page_content_kind()]
} }
} }
@@ -1085,7 +1085,6 @@ impl GlobalAndPerTimelineHistogram {
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
h: &'a GlobalAndPerTimelineHistogram, h: &'a GlobalAndPerTimelineHistogram,
ctx: &'c RequestContext,
start: std::time::Instant, start: std::time::Instant,
op: SmgrQueryType, op: SmgrQueryType,
} }
@@ -1093,32 +1092,11 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
fn drop(&mut self) { fn drop(&mut self) {
let elapsed = self.start.elapsed(); let elapsed = self.start.elapsed();
let ex_throttled = self self.h.observe(elapsed.as_secs_f64());
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
self.h.observe(ex_throttled.as_secs_f64());
} }
} }
#[derive( #[derive(
Debug, Debug,
Clone, Clone,
@@ -1233,33 +1211,11 @@ impl SmgrQueryTimePerTimeline {
}); });
Self { metrics } Self { metrics }
} }
pub(crate) fn start_timer<'c: 'a, 'a>( pub(crate) fn start_timer<'a>(&'a self, op: SmgrQueryType) -> impl Drop + 'a {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> impl Drop + '_ {
let metric = &self.metrics[op as usize]; let metric = &self.metrics[op as usize];
let start = Instant::now(); let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
GlobalAndPerTimelineHistogramTimer { GlobalAndPerTimelineHistogramTimer {
h: metric, h: metric,
ctx,
start, start,
op, op,
} }
@@ -1326,7 +1282,7 @@ mod smgr_query_time_tests {
assert_eq!(pre_per_tenant_timeline, 0); assert_eq!(pre_per_tenant_timeline, 0);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer(*op, &ctx); let timer = metrics.start_timer(*op);
drop(timer); drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts(); let (post_global, post_per_tenant_timeline) = get_counts();

View File

@@ -331,7 +331,7 @@ impl PageCache {
&self, &self,
file_id: FileId, file_id: FileId,
blkno: u32, blkno: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ReadBufResult> { ) -> anyhow::Result<ReadBufResult> {
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx) self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
.await .await
@@ -422,7 +422,7 @@ impl PageCache {
async fn lock_for_read( async fn lock_for_read(
&self, &self,
cache_key: &CacheKey, cache_key: &CacheKey,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ReadBufResult> { ) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?); let mut permit = Some(self.try_get_pinned_slot_permit().await?);

View File

@@ -554,7 +554,7 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
protocol_version: PagestreamProtocolVersion, protocol_version: PagestreamProtocolVersion,
ctx: RequestContext, mut ctx: RequestContext,
) -> Result<(), QueryError> ) -> Result<(), QueryError>
where where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -624,7 +624,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::exists"); fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
( (
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &mut ctx)
.instrument(span.clone()) .instrument(span.clone())
.await, .await,
span, span,
@@ -634,7 +634,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::nblocks"); fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
( (
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &mut ctx)
.instrument(span.clone()) .instrument(span.clone())
.await, .await,
span, span,
@@ -645,7 +645,7 @@ impl PageServerHandler {
// shard_id is filled in by the handler // shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
( (
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &mut ctx)
.instrument(span.clone()) .instrument(span.clone())
.await, .await,
span, span,
@@ -655,7 +655,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::dbsize"); fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
( (
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) self.handle_db_size_request(tenant_id, timeline_id, &req, &mut ctx)
.instrument(span.clone()) .instrument(span.clone())
.await, .await,
span, span,
@@ -665,7 +665,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
( (
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &mut ctx)
.instrument(span.clone()) .instrument(span.clone())
.await, .await,
span, span,
@@ -728,7 +728,7 @@ impl PageServerHandler {
base_lsn: Lsn, base_lsn: Lsn,
_end_lsn: Lsn, _end_lsn: Lsn,
pg_version: u32, pg_version: u32,
ctx: RequestContext, mut ctx: RequestContext,
) -> Result<(), QueryError> ) -> Result<(), QueryError>
where where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -741,7 +741,7 @@ impl PageServerHandler {
.get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT) .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
.await?; .await?;
let timeline = tenant let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .create_empty_timeline(timeline_id, base_lsn, pg_version, &mut ctx)
.await?; .await?;
// TODO mark timeline as not ready until it reaches end_lsn. // TODO mark timeline as not ready until it reaches end_lsn.
@@ -766,7 +766,7 @@ impl PageServerHandler {
&mut copyin_reader, &mut copyin_reader,
base_lsn, base_lsn,
self.broker_client.clone(), self.broker_client.clone(),
&ctx, &mut ctx,
) )
.await?; .await?;
@@ -791,7 +791,7 @@ impl PageServerHandler {
timeline_id: TimelineId, timeline_id: TimelineId,
start_lsn: Lsn, start_lsn: Lsn,
end_lsn: Lsn, end_lsn: Lsn,
ctx: RequestContext, mut ctx: RequestContext,
) -> Result<(), QueryError> ) -> Result<(), QueryError>
where where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -814,7 +814,7 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?; self.flush_cancellable(pgb, &timeline.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel))); let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &mut ctx).await?;
info!("wal import complete"); info!("wal import complete");
// Read the end of the tar archive. // Read the end of the tar archive.
@@ -867,7 +867,7 @@ impl PageServerHandler {
request_lsn: Lsn, request_lsn: Lsn,
not_modified_since: Lsn, not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>, latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Lsn, PageStreamError> { ) -> Result<Lsn, PageStreamError> {
let last_record_lsn = timeline.get_last_record_lsn(); let last_record_lsn = timeline.get_last_record_lsn();
@@ -926,7 +926,7 @@ impl PageServerHandler {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
timeline_id: TimelineId, timeline_id: TimelineId,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), QueryError> ) -> Result<(), QueryError>
where where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -958,12 +958,12 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
req: &PagestreamExistsRequest, req: &PagestreamExistsRequest,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline let _timer = timeline
.query_metrics .query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx); .start_timer(metrics::SmgrQueryType::GetRelExists);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn( let lsn = Self::wait_or_get_last_lsn(
@@ -990,13 +990,13 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
req: &PagestreamNblocksRequest, req: &PagestreamNblocksRequest,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline let _timer = timeline
.query_metrics .query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx); .start_timer(metrics::SmgrQueryType::GetRelSize);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn( let lsn = Self::wait_or_get_last_lsn(
@@ -1023,13 +1023,13 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
req: &PagestreamDbSizeRequest, req: &PagestreamDbSizeRequest,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline let _timer = timeline
.query_metrics .query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx); .start_timer(metrics::SmgrQueryType::GetDbSize);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn( let lsn = Self::wait_or_get_last_lsn(
@@ -1173,7 +1173,7 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
req: &PagestreamGetPageRequest, req: &PagestreamGetPageRequest,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self.get_cached_timeline_for_page(req) { let timeline = match self.get_cached_timeline_for_page(req) {
Ok(tl) => { Ok(tl) => {
@@ -1206,7 +1206,7 @@ impl PageServerHandler {
let _timer = timeline let _timer = timeline
.query_metrics .query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); .start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn( let lsn = Self::wait_or_get_last_lsn(
@@ -1233,13 +1233,13 @@ impl PageServerHandler {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
req: &PagestreamGetSlruSegmentRequest, req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> { ) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline let _timer = timeline
.query_metrics .query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); .start_timer(metrics::SmgrQueryType::GetSlruSegment);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn( let lsn = Self::wait_or_get_last_lsn(
@@ -1275,7 +1275,7 @@ impl PageServerHandler {
prev_lsn: Option<Lsn>, prev_lsn: Option<Lsn>,
full_backup: bool, full_backup: bool,
gzip: bool, gzip: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), QueryError> ) -> Result<(), QueryError>
where where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -1534,7 +1534,7 @@ where
fail::fail_point!("ps::connection-start::process-query"); fail::fail_point!("ps::connection-start::process-query");
let ctx = self.connection_ctx.attached_child(); let mut ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}"); debug!("process query {query_string:?}");
let parts = query_string.split_whitespace().collect::<Vec<_>>(); let parts = query_string.split_whitespace().collect::<Vec<_>>();
if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) { if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
@@ -1624,7 +1624,7 @@ where
} }
}; };
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx); let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&mut ctx);
let res = async { let res = async {
self.handle_basebackup_request( self.handle_basebackup_request(
pgb, pgb,
@@ -1634,7 +1634,7 @@ where
None, None,
false, false,
gzip, gzip,
&ctx, &mut ctx,
) )
.await?; .await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
@@ -1732,7 +1732,7 @@ where
prev_lsn, prev_lsn,
true, true,
false, false,
&ctx, &mut ctx,
) )
.await?; .await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
@@ -1860,7 +1860,7 @@ where
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?; .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
match self match self
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx) .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &mut ctx)
.await .await
{ {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,

View File

@@ -188,7 +188,7 @@ impl Timeline {
tag: RelTag, tag: RelTag,
blknum: BlockNumber, blknum: BlockNumber,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 { if tag.relnode == 0 {
return Err(PageReconstructError::Other( return Err(PageReconstructError::Other(
@@ -218,7 +218,7 @@ impl Timeline {
spcnode: Oid, spcnode: Oid,
dbnode: Oid, dbnode: Oid,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<usize, PageReconstructError> { ) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0; let mut total_blocks = 0;
@@ -236,7 +236,7 @@ impl Timeline {
&self, &self,
tag: RelTag, tag: RelTag,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockNumber, PageReconstructError> { ) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 { if tag.relnode == 0 {
return Err(PageReconstructError::Other( return Err(PageReconstructError::Other(
@@ -272,7 +272,7 @@ impl Timeline {
&self, &self,
tag: RelTag, tag: RelTag,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> { ) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 { if tag.relnode == 0 {
return Err(PageReconstructError::Other( return Err(PageReconstructError::Other(
@@ -307,7 +307,7 @@ impl Timeline {
spcnode: Oid, spcnode: Oid,
dbnode: Oid, dbnode: Oid,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> { ) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing // fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode); let key = rel_dir_to_key(spcnode, dbnode);
@@ -335,7 +335,7 @@ impl Timeline {
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
let n_blocks = self let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
@@ -357,7 +357,7 @@ impl Timeline {
segno: u32, segno: u32,
blknum: BlockNumber, blknum: BlockNumber,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
let key = slru_block_to_key(kind, segno, blknum); let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await self.get(key, lsn, ctx).await
@@ -369,7 +369,7 @@ impl Timeline {
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockNumber, PageReconstructError> { ) -> Result<BlockNumber, PageReconstructError> {
let key = slru_segment_size_to_key(kind, segno); let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?; let mut buf = version.get(self, key, ctx).await?;
@@ -382,7 +382,7 @@ impl Timeline {
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> { ) -> Result<bool, PageReconstructError> {
// fetch directory listing // fetch directory listing
let key = slru_dir_to_key(kind); let key = slru_dir_to_key(kind);
@@ -408,7 +408,7 @@ impl Timeline {
&self, &self,
search_timestamp: TimestampTz, search_timestamp: TimestampTz,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> { ) -> Result<LsnForTimestamp, PageReconstructError> {
pausable_failpoint!("find-lsn-for-timestamp-pausable"); pausable_failpoint!("find-lsn-for-timestamp-pausable");
@@ -499,7 +499,7 @@ impl Timeline {
probe_lsn: Lsn, probe_lsn: Lsn,
found_smaller: &mut bool, found_smaller: &mut bool,
found_larger: &mut bool, found_larger: &mut bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<bool, PageReconstructError> { ) -> Result<bool, PageReconstructError> {
self.map_all_timestamps(probe_lsn, ctx, |timestamp| { self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
if timestamp >= search_timestamp { if timestamp >= search_timestamp {
@@ -519,7 +519,7 @@ impl Timeline {
pub(crate) async fn get_timestamp_for_lsn( pub(crate) async fn get_timestamp_for_lsn(
&self, &self,
probe_lsn: Lsn, probe_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<TimestampTz>, PageReconstructError> { ) -> Result<Option<TimestampTz>, PageReconstructError> {
let mut max: Option<TimestampTz> = None; let mut max: Option<TimestampTz> = None;
self.map_all_timestamps(probe_lsn, ctx, |timestamp| { self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
@@ -542,7 +542,7 @@ impl Timeline {
async fn map_all_timestamps<T: Default>( async fn map_all_timestamps<T: Default>(
&self, &self,
probe_lsn: Lsn, probe_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>, mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> { ) -> Result<T, PageReconstructError> {
for segno in self for segno in self
@@ -575,7 +575,7 @@ impl Timeline {
pub(crate) async fn get_slru_keyspace( pub(crate) async fn get_slru_keyspace(
&self, &self,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<KeySpace, PageReconstructError> { ) -> Result<KeySpace, PageReconstructError> {
let mut accum = KeySpaceAccum::new(); let mut accum = KeySpaceAccum::new();
@@ -604,7 +604,7 @@ impl Timeline {
&self, &self,
kind: SlruKind, kind: SlruKind,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashSet<u32>, PageReconstructError> { ) -> Result<HashSet<u32>, PageReconstructError> {
// fetch directory entry // fetch directory entry
let key = slru_dir_to_key(kind); let key = slru_dir_to_key(kind);
@@ -621,7 +621,7 @@ impl Timeline {
spcnode: Oid, spcnode: Oid,
dbnode: Oid, dbnode: Oid,
version: Version<'_>, version: Version<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
let key = relmap_file_key(spcnode, dbnode); let key = relmap_file_key(spcnode, dbnode);
@@ -632,7 +632,7 @@ impl Timeline {
pub(crate) async fn list_dbdirs( pub(crate) async fn list_dbdirs(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> { ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
// fetch directory entry // fetch directory entry
let buf = self.get(DBDIR_KEY, lsn, ctx).await?; let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
@@ -647,7 +647,7 @@ impl Timeline {
&self, &self,
xid: TransactionId, xid: TransactionId,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
let key = twophase_file_key(xid); let key = twophase_file_key(xid);
let buf = self.get(key, lsn, ctx).await?; let buf = self.get(key, lsn, ctx).await?;
@@ -657,7 +657,7 @@ impl Timeline {
pub(crate) async fn list_twophase_files( pub(crate) async fn list_twophase_files(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashSet<TransactionId>, PageReconstructError> { ) -> Result<HashSet<TransactionId>, PageReconstructError> {
// fetch directory entry // fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?; let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
@@ -671,7 +671,7 @@ impl Timeline {
pub(crate) async fn get_control_file( pub(crate) async fn get_control_file(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
self.get(CONTROLFILE_KEY, lsn, ctx).await self.get(CONTROLFILE_KEY, lsn, ctx).await
} }
@@ -679,7 +679,7 @@ impl Timeline {
pub(crate) async fn get_checkpoint( pub(crate) async fn get_checkpoint(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
self.get(CHECKPOINT_KEY, lsn, ctx).await self.get(CHECKPOINT_KEY, lsn, ctx).await
} }
@@ -687,7 +687,7 @@ impl Timeline {
async fn list_aux_files_v1( async fn list_aux_files_v1(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> { ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await { match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") { Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
@@ -705,7 +705,7 @@ impl Timeline {
async fn list_aux_files_v2( async fn list_aux_files_v2(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> { ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let kv = self let kv = self
.scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx) .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
@@ -729,7 +729,7 @@ impl Timeline {
pub(crate) async fn trigger_aux_file_size_computation( pub(crate) async fn trigger_aux_file_size_computation(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
let current_policy = self.last_aux_file_policy.load(); let current_policy = self.last_aux_file_policy.load();
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy { if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
@@ -741,7 +741,7 @@ impl Timeline {
pub(crate) async fn list_aux_files( pub(crate) async fn list_aux_files(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> { ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let current_policy = self.last_aux_file_policy.load(); let current_policy = self.last_aux_file_policy.load();
match current_policy { match current_policy {
@@ -779,7 +779,7 @@ impl Timeline {
pub(crate) async fn get_replorigins( pub(crate) async fn get_replorigins(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> { ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
let kv = self let kv = self
.scan(KeySpace::single(repl_origin_key_range()), lsn, ctx) .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
@@ -809,7 +809,7 @@ impl Timeline {
pub(crate) async fn get_current_logical_size_non_incremental( pub(crate) async fn get_current_logical_size_non_incremental(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> { ) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
@@ -845,7 +845,7 @@ impl Timeline {
pub(crate) async fn collect_keyspace( pub(crate) async fn collect_keyspace(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> { ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
// Iterate through key ranges, greedily packing them into partitions // Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new(); let mut result = KeySpaceAccum::new();
@@ -1145,7 +1145,7 @@ impl<'a> DatadirModification<'a> {
spcnode: Oid, spcnode: Oid,
dbnode: Oid, dbnode: Oid,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Add it to the directory (if it doesn't exist already) // Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?; let buf = self.get(DBDIR_KEY, ctx).await?;
@@ -1182,7 +1182,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
xid: TransactionId, xid: TransactionId,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Add it to the directory entry // Add it to the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?; let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
@@ -1229,7 +1229,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
spcnode: Oid, spcnode: Oid,
dbnode: Oid, dbnode: Oid,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let total_blocks = self let total_blocks = self
.tline .tline
@@ -1266,7 +1266,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
rel: RelTag, rel: RelTag,
nblocks: BlockNumber, nblocks: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), RelationError> { ) -> Result<(), RelationError> {
if rel.relnode == 0 { if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode); return Err(RelationError::InvalidRelnode);
@@ -1328,7 +1328,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
rel: RelTag, rel: RelTag,
nblocks: BlockNumber, nblocks: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self if self
@@ -1362,7 +1362,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
rel: RelTag, rel: RelTag,
nblocks: BlockNumber, nblocks: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
@@ -1384,7 +1384,11 @@ impl<'a> DatadirModification<'a> {
} }
/// Drop a relation. /// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { pub async fn put_rel_drop(
&mut self,
rel: RelTag,
ctx: &mut RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
// Remove it from the directory entry // Remove it from the directory entry
@@ -1420,7 +1424,7 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
nblocks: BlockNumber, nblocks: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Add it to the directory entry // Add it to the directory entry
let dir_key = slru_dir_to_key(kind); let dir_key = slru_dir_to_key(kind);
@@ -1466,7 +1470,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Remove it from the directory entry // Remove it from the directory entry
let dir_key = slru_dir_to_key(kind); let dir_key = slru_dir_to_key(kind);
@@ -1499,7 +1503,7 @@ impl<'a> DatadirModification<'a> {
pub async fn drop_twophase_file( pub async fn drop_twophase_file(
&mut self, &mut self,
xid: TransactionId, xid: TransactionId,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Remove it from the directory entry // Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?; let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
@@ -1538,7 +1542,7 @@ impl<'a> DatadirModification<'a> {
&mut self, &mut self,
path: &str, path: &str,
content: &[u8], content: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let switch_policy = self.tline.get_switch_aux_file_policy(); let switch_policy = self.tline.get_switch_aux_file_policy();
@@ -1731,7 +1735,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK /// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to /// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice. /// modify the same pages twice.
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { pub async fn flush(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it // Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list. // to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks; let pending_nblocks = self.pending_nblocks;
@@ -1777,7 +1781,7 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline. /// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN. /// All the modifications in this atomic update are stamped by the specified LSN.
/// ///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { pub async fn commit(&mut self, ctx: &mut RequestContext) -> anyhow::Result<()> {
let mut writer = self.tline.writer().await; let mut writer = self.tline.writer().await;
let pending_nblocks = self.pending_nblocks; let pending_nblocks = self.pending_nblocks;
@@ -1828,7 +1832,7 @@ impl<'a> DatadirModification<'a> {
// Internal helper functions to batch the modifications // Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> { async fn get(&self, key: Key, ctx: &mut RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated // Have we already updated the same key? Read the latest pending updated
// version in that case. // version in that case.
// //
@@ -1895,7 +1899,7 @@ impl<'a> Version<'a> {
&self, &self,
timeline: &Timeline, timeline: &Timeline,
key: Key, key: Key,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
match self { match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,

View File

@@ -547,7 +547,7 @@ impl Tenant {
metadata: TimelineMetadata, metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>, ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>, last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id; let tenant_id = self.tenant_shard_id;
@@ -656,7 +656,7 @@ impl Tenant {
init_order: Option<InitializationOrder>, init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>, tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode, mode: SpawnMode,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> { ) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf, conf,
@@ -965,7 +965,7 @@ impl Tenant {
self: &Arc<Tenant>, self: &Arc<Tenant>,
preload: Option<TenantPreload>, preload: Option<TenantPreload>,
mode: SpawnMode, mode: SpawnMode,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id(); span::debug_assert_current_span_has_tenant_id();
@@ -1175,7 +1175,7 @@ impl Tenant {
index_part: IndexPart, index_part: IndexPart,
remote_metadata: TimelineMetadata, remote_metadata: TimelineMetadata,
resources: TimelineResources, resources: TimelineResources,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id(); span::debug_assert_current_span_has_tenant_id();
@@ -1358,7 +1358,7 @@ impl Tenant {
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
pg_version: u32, pg_version: u32,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<UninitializedTimeline> { ) -> anyhow::Result<UninitializedTimeline> {
anyhow::ensure!( anyhow::ensure!(
self.is_active(), self.is_active(),
@@ -1401,7 +1401,7 @@ impl Tenant {
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
pg_version: u32, pg_version: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
@@ -1440,7 +1440,7 @@ impl Tenant {
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
pg_version: u32, pg_version: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>, delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn, end_lsn: Lsn,
@@ -1477,7 +1477,7 @@ impl Tenant {
pg_version: u32, pg_version: u32,
load_existing_initdb: Option<TimelineId>, load_existing_initdb: Option<TimelineId>,
broker_client: storage_broker::BrokerClientChannel, broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> { ) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() { if !self.is_active() {
if matches!(self.current_state(), TenantState::Stopping { .. }) { if matches!(self.current_state(), TenantState::Stopping { .. }) {
@@ -1650,7 +1650,7 @@ impl Tenant {
horizon: u64, horizon: u64,
pitr: Duration, pitr: Duration,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<GcResult, GcError> { ) -> Result<GcResult, GcError> {
// Don't start doing work during shutdown // Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() { if let TenantState::Stopping { .. } = self.current_state() {
@@ -1682,7 +1682,7 @@ impl Tenant {
async fn compaction_iteration( async fn compaction_iteration(
&self, &self,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<(), timeline::CompactionError> { ) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs // Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() { if !self.is_active() {
@@ -1779,7 +1779,7 @@ impl Tenant {
self: &Arc<Self>, self: &Arc<Self>,
broker_client: BrokerClientChannel, broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>, background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
span::debug_assert_current_span_has_tenant_id(); span::debug_assert_current_span_has_tenant_id();
@@ -2834,7 +2834,7 @@ impl Tenant {
horizon: u64, horizon: u64,
pitr: Duration, pitr: Duration,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<GcResult, GcError> { ) -> Result<GcResult, GcError> {
let mut totals: GcResult = Default::default(); let mut totals: GcResult = Default::default();
let now = Instant::now(); let now = Instant::now();
@@ -2894,7 +2894,7 @@ impl Tenant {
pub(crate) async fn refresh_gc_info( pub(crate) async fn refresh_gc_info(
&self, &self,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<Arc<Timeline>>, GcError> { ) -> Result<Vec<Arc<Timeline>>, GcError> {
// since this method can now be called at different rates than the configured gc loop, it // since this method can now be called at different rates than the configured gc loop, it
// might be that these configuration values get applied faster than what it was previously, // might be that these configuration values get applied faster than what it was previously,
@@ -2915,7 +2915,7 @@ impl Tenant {
horizon: u64, horizon: u64,
pitr: Duration, pitr: Duration,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<Arc<Timeline>>, GcError> { ) -> Result<Vec<Arc<Timeline>>, GcError> {
// before taking the gc_cs lock, do the heavier weight finding of gc_cutoff points for // before taking the gc_cs lock, do the heavier weight finding of gc_cutoff points for
// currently visible timelines. // currently visible timelines.
@@ -3053,7 +3053,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>, src_timeline: &Arc<Timeline>,
dst_id: TimelineId, dst_id: TimelineId,
ancestor_lsn: Option<Lsn>, ancestor_lsn: Option<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> { ) -> Result<Arc<Timeline>, CreateTimelineError> {
let create_guard = self.create_timeline_create_guard(dst_id).unwrap(); let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
let tl = self let tl = self
@@ -3071,7 +3071,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>, src_timeline: &Arc<Timeline>,
dst_id: TimelineId, dst_id: TimelineId,
ancestor_lsn: Option<Lsn>, ancestor_lsn: Option<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>, delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn, end_lsn: Lsn,
@@ -3108,7 +3108,7 @@ impl Tenant {
dst_id: TimelineId, dst_id: TimelineId,
start_lsn: Option<Lsn>, start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>, timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> { ) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx) self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
.await .await
@@ -3120,7 +3120,7 @@ impl Tenant {
dst_id: TimelineId, dst_id: TimelineId,
start_lsn: Option<Lsn>, start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>, timeline_create_guard: TimelineCreateGuard<'_>,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> { ) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id; let src_id = src_timeline.timeline_id;
@@ -3233,7 +3233,7 @@ impl Tenant {
timeline_id: TimelineId, timeline_id: TimelineId,
pg_version: u32, pg_version: u32,
load_existing_initdb: Option<TimelineId>, load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let create_guard = self.create_timeline_create_guard(timeline_id).unwrap(); let create_guard = self.create_timeline_create_guard(timeline_id).unwrap();
self.bootstrap_timeline( self.bootstrap_timeline(
@@ -3305,7 +3305,7 @@ impl Tenant {
pg_version: u32, pg_version: u32,
load_existing_initdb: Option<TimelineId>, load_existing_initdb: Option<TimelineId>,
timeline_create_guard: TimelineCreateGuard<'_>, timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline. // temporary directory for basebackup files for the given timeline.
@@ -3563,7 +3563,7 @@ impl Tenant {
max_retention_period: Option<u64>, max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> { ) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
let logical_sizes_at_once = self let logical_sizes_at_once = self
.conf .conf
@@ -3603,7 +3603,7 @@ impl Tenant {
&self, &self,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, size::CalculateSyntheticSizeError> { ) -> Result<u64, size::CalculateSyntheticSizeError> {
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?; let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
@@ -3756,7 +3756,7 @@ async fn run_initdb(
pub async fn dump_layerfile_from_path( pub async fn dump_layerfile_from_path(
path: &Utf8Path, path: &Utf8Path,
verbose: bool, verbose: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
@@ -3960,7 +3960,7 @@ pub(crate) mod harness {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) async fn do_try_load( pub(crate) async fn do_try_load(
&self, &self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> { ) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
@@ -4219,7 +4219,7 @@ mod tests {
async fn make_some_layers( async fn make_some_layers(
tline: &Timeline, tline: &Timeline,
start_lsn: Lsn, start_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut lsn = start_lsn; let mut lsn = start_lsn;
{ {
@@ -4707,7 +4707,7 @@ mod tests {
async fn bulk_insert_compact_gc( async fn bulk_insert_compact_gc(
tenant: &Tenant, tenant: &Tenant,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
lsn: Lsn, lsn: Lsn,
repeat: usize, repeat: usize,
key_count: usize, key_count: usize,
@@ -4719,7 +4719,7 @@ mod tests {
async fn bulk_insert_maybe_compact_gc( async fn bulk_insert_maybe_compact_gc(
tenant: &Tenant, tenant: &Tenant,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
mut lsn: Lsn, mut lsn: Lsn,
repeat: usize, repeat: usize,
key_count: usize, key_count: usize,
@@ -6249,7 +6249,7 @@ mod tests {
tline: &Timeline, tline: &Timeline,
keyspace: &KeySpace, keyspace: &KeySpace,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> { ) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default(); let mut reconstruct_state = ValuesReconstructState::default();
let res = tline let res = tline
@@ -6365,7 +6365,7 @@ mod tests {
tline: &Arc<Timeline>, tline: &Arc<Timeline>,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> { ) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new(); let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline let mut res = tline
@@ -6461,7 +6461,7 @@ mod tests {
tline: &Arc<Timeline>, tline: &Arc<Timeline>,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> { ) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new(); let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline let mut res = tline
@@ -6515,7 +6515,7 @@ mod tests {
tline: &Arc<Timeline>, tline: &Arc<Timeline>,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> { ) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new(); let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline let mut res = tline

View File

@@ -26,7 +26,7 @@ impl<'a> BlockCursor<'a> {
pub async fn read_blob( pub async fn read_blob(
&self, &self,
offset: u64, offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<u8>, std::io::Error> { ) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new(); let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?; self.read_blob_into_buf(offset, &mut buf, ctx).await?;
@@ -38,7 +38,7 @@ impl<'a> BlockCursor<'a> {
&self, &self,
offset: u64, offset: u64,
dstbuf: &mut Vec<u8>, dstbuf: &mut Vec<u8>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), std::io::Error> { ) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32; let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize; let mut off = (offset % PAGE_SZ as u64) as usize;
@@ -130,7 +130,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
src_buf: B, src_buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) { ) -> (B::Buf, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res { let nbytes = match res {
@@ -143,7 +143,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
#[inline(always)] #[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`. /// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { pub async fn flush_buffer(&mut self, ctx: &mut RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf); let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf, ctx).await; let (mut buf, res) = self.inner.write_all(buf, ctx).await;
res?; res?;
@@ -166,7 +166,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
src_buf: B, src_buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) { ) -> (B::Buf, Result<(), Error>) {
if !BUFFERED { if !BUFFERED {
assert!(self.buf.is_empty()); assert!(self.buf.is_empty());
@@ -218,7 +218,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
srcbuf: B, srcbuf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (B::Buf, Result<u64, Error>) { ) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset; let offset = self.offset;
@@ -267,7 +267,7 @@ impl BlobWriter<true> {
/// ///
/// This function flushes the internal buffer before giving access /// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`. /// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> { pub async fn into_inner(mut self, ctx: &mut RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?; self.flush_buffer(ctx).await?;
Ok(self.inner) Ok(self.inner)
} }

View File

@@ -92,7 +92,7 @@ impl<'a> BlockReaderRef<'a> {
async fn read_blk( async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> { ) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*; use BlockReaderRef::*;
match self { match self {
@@ -150,7 +150,7 @@ impl<'a> BlockCursor<'a> {
pub async fn read_blk( pub async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> { ) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum, ctx).await self.reader.read_blk(blknum, ctx).await
} }
@@ -177,7 +177,7 @@ impl<'a> FileBlockReader<'a> {
&self, &self,
buf: PageWriteGuard<'static>, buf: PageWriteGuard<'static>,
blkno: u32, blkno: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PageWriteGuard<'static>, std::io::Error> { ) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ); assert!(buf.len() == PAGE_SZ);
self.file self.file
@@ -192,7 +192,7 @@ impl<'a> FileBlockReader<'a> {
pub async fn read_blk<'b>( pub async fn read_blk<'b>(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease<'b>, std::io::Error> { ) -> Result<BlockLease<'b>, std::io::Error> {
let cache = page_cache::get(); let cache = page_cache::get();
match cache match cache

View File

@@ -404,7 +404,7 @@ impl DeleteTenantFlow {
tenant: &Arc<Tenant>, tenant: &Arc<Tenant>,
preload: Option<TenantPreload>, preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>, tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), DeleteTenantError> { ) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel(); let (_, progress) = completion::channel();

View File

@@ -242,7 +242,7 @@ where
/// ///
/// Read the value for given key. Returns the value, or None if it doesn't exist. /// Read the value for given key. Returns the value, or None if it doesn't exist.
/// ///
pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> { pub async fn get(&self, search_key: &[u8; L], ctx: &mut RequestContext) -> Result<Option<u64>> {
let mut result: Option<u64> = None; let mut result: Option<u64> = None;
self.visit( self.visit(
search_key, search_key,
@@ -278,7 +278,7 @@ where
pub fn get_stream_from<'a>( pub fn get_stream_from<'a>(
&'a self, &'a self,
start_key: &'a [u8; L], start_key: &'a [u8; L],
ctx: &'a RequestContext, ctx: &'a mut RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a { ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
try_stream! { try_stream! {
let mut stack = Vec::new(); let mut stack = Vec::new();
@@ -363,7 +363,7 @@ where
search_key: &[u8; L], search_key: &[u8; L],
dir: VisitDirection, dir: VisitDirection,
mut visitor: V, mut visitor: V,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<bool> ) -> Result<bool>
where where
V: FnMut(&[u8], u64) -> bool, V: FnMut(&[u8], u64) -> bool,

View File

@@ -28,7 +28,7 @@ impl EphemeralFile {
conf: &PageServerConf, conf: &PageServerConf,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
timeline_id: TimelineId, timeline_id: TimelineId,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<EphemeralFile, io::Error> { ) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator = let filename_disambiguator =
@@ -68,7 +68,7 @@ impl EphemeralFile {
pub(crate) async fn read_blk( pub(crate) async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease, io::Error> { ) -> Result<BlockLease, io::Error> {
self.rw.read_blk(blknum, ctx).await self.rw.read_blk(blknum, ctx).await
} }
@@ -76,7 +76,7 @@ impl EphemeralFile {
pub(crate) async fn write_blob( pub(crate) async fn write_blob(
&mut self, &mut self,
srcbuf: &[u8], srcbuf: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, io::Error> { ) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written(); let pos = self.rw.bytes_written();

View File

@@ -38,7 +38,7 @@ impl RW {
pub(crate) async fn write_all_borrowed( pub(crate) async fn write_all_borrowed(
&mut self, &mut self,
srcbuf: &[u8], srcbuf: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<usize, io::Error> { ) -> Result<usize, io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path // It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data. // because Compute is unlikely to access recently written data.
@@ -52,7 +52,7 @@ impl RW {
pub(crate) async fn read_blk( pub(crate) async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease, io::Error> { ) -> Result<BlockLease, io::Error> {
match self.rw.read_blk(blknum).await? { match self.rw.read_blk(blknum).await? {
zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => { zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
@@ -138,7 +138,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
>( >(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> { ) -> std::io::Result<(usize, B::Buf)> {
let buf = buf.slice(..); let buf = buf.slice(..);
let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done

View File

@@ -64,7 +64,7 @@ where
pub async fn write_all_borrowed( pub async fn write_all_borrowed(
&mut self, &mut self,
buf: &[u8], buf: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<usize> { ) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf, ctx).await self.buffered_writer.write_buffered_borrowed(buf, ctx).await
} }

View File

@@ -850,7 +850,7 @@ impl LayerMap {
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
#[allow(unused)] #[allow(unused)]
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
println!("Begin dump LayerMap"); println!("Begin dump LayerMap");
println!("open_layer:"); println!("open_layer:");

View File

@@ -696,7 +696,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>, init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>, tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode, mode: SpawnMode,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Tenant>> { ) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!( anyhow::ensure!(
tenant_path.is_dir(), tenant_path.is_dir(),
@@ -956,7 +956,7 @@ impl TenantManager {
new_location_config: LocationConf, new_location_config: LocationConf,
flush: Option<Duration>, flush: Option<Duration>,
mut spawn_mode: SpawnMode, mut spawn_mode: SpawnMode,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<Arc<Tenant>>, UpsertLocationError> { ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
debug_assert_current_span_has_tenant_id(); debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}"); info!("configuring tenant location to state {new_location_config:?}");
@@ -1247,7 +1247,7 @@ impl TenantManager {
&self, &self,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
drop_cache: bool, drop_cache: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else { let Some(old_slot) = slot_guard.get_old_value() else {
@@ -1509,7 +1509,7 @@ impl TenantManager {
tenant: Arc<Tenant>, tenant: Arc<Tenant>,
new_shard_count: ShardCount, new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>, new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> { ) -> anyhow::Result<Vec<TenantShardId>> {
let tenant_shard_id = *tenant.get_tenant_shard_id(); let tenant_shard_id = *tenant.get_tenant_shard_id();
let r = self let r = self
@@ -1539,7 +1539,7 @@ impl TenantManager {
tenant: Arc<Tenant>, tenant: Arc<Tenant>,
new_shard_count: ShardCount, new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>, new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> { ) -> anyhow::Result<Vec<TenantShardId>> {
let tenant_shard_id = *tenant.get_tenant_shard_id(); let tenant_shard_id = *tenant.get_tenant_shard_id();
@@ -1994,7 +1994,7 @@ impl TenantManager {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
timeline_id: TimelineId, timeline_id: TimelineId,
prepared: PreparedTimelineDetach, prepared: PreparedTimelineDetach,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> { ) -> Result<Vec<TimelineId>, anyhow::Error> {
struct RevertOnDropSlot(Option<SlotGuard>); struct RevertOnDropSlot(Option<SlotGuard>);
@@ -2229,7 +2229,7 @@ pub(crate) async fn load_tenant(
broker_client: storage_broker::BrokerClientChannel, broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage, remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient, deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), TenantMapInsertError> { ) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding // This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id); let tenant_shard_id = TenantShardId::unsharded(tenant_id);
@@ -2837,7 +2837,7 @@ pub(crate) async fn immediate_gc(
timeline_id: TimelineId, timeline_id: TimelineId,
gc_req: TimelineGcRequest, gc_req: TimelineGcRequest,
cancel: CancellationToken, cancel: CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<GcResult, ApiError> { ) -> Result<GcResult, ApiError> {
let tenant = { let tenant = {
let guard = TENANTS.read().unwrap(); let guard = TENANTS.read().unwrap();

View File

@@ -518,7 +518,7 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata, layer_metadata: &LayerFileMetadata,
local_path: &Utf8Path, local_path: &Utf8Path,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<u64> { ) -> anyhow::Result<u64> {
let downloaded_size = { let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin( let _unfinished_gauge_guard = self.metrics.call_begin(

View File

@@ -52,7 +52,7 @@ pub async fn download_layer_file<'a>(
layer_metadata: &'a LayerFileMetadata, layer_metadata: &'a LayerFileMetadata,
local_path: &Utf8Path, local_path: &Utf8Path,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, DownloadError> { ) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
@@ -78,8 +78,33 @@ pub async fn download_layer_file<'a>(
// If pageserver crashes the temp file will be deleted on startup and re-downloaded. // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION); let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
struct DownloadObjectClosure<'a> {
storage: &'a GenericRemoteStorage,
remote_path: &'a RemotePath,
temp_file_path: &'a Utf8PathBuf,
cancel: &'a CancellationToken,
ctx: &'a mut RequestContext,
}
impl backoff::Op<u64, DownloadError> for DownloadObjectClosure<'_> {
async fn call(&mut self) -> Result<u64, DownloadError> {
let DownloadObjectClosure {
storage,
remote_path,
temp_file_path,
cancel,
ctx,
} = self;
download_object(storage, remote_path, temp_file_path, cancel, ctx).await
}
}
let bytes_amount = download_retry( let bytes_amount = download_retry(
|| async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await }, DownloadObjectClosure {
storage,
remote_path: &remote_path,
temp_file_path: &temp_file_path,
cancel,
ctx,
},
&format!("download {remote_path:?}"), &format!("download {remote_path:?}"),
cancel, cancel,
) )
@@ -107,9 +132,9 @@ pub async fn download_layer_file<'a>(
// the in-memory state of the filesystem already has the layer file in its final place, // the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't. // and subsequent pageserver code could think it's durable while it really isn't.
let work = { let work = {
let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior()); let mut ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
async move { async move {
let timeline_dir = VirtualFile::open(&timeline_path, &ctx) let timeline_dir = VirtualFile::open(&timeline_path, &mut ctx)
.await .await
.fatal_err("VirtualFile::open for timeline dir fsync"); .fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir timeline_dir
@@ -140,7 +165,7 @@ async fn download_object<'a>(
src_path: &RemotePath, src_path: &RemotePath,
dst_path: &Utf8PathBuf, dst_path: &Utf8PathBuf,
cancel: &CancellationToken, cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &mut RequestContext,
) -> Result<u64, DownloadError> { ) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() { let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"), crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
@@ -568,15 +593,11 @@ pub(crate) async fn download_initdb_tar_zst(
/// with backoff. /// with backoff.
/// ///
/// (See similar logic for uploads in `perform_upload_task`) /// (See similar logic for uploads in `perform_upload_task`)
pub(super) async fn download_retry<T, O, F>( pub(super) async fn download_retry<T>(
op: O, op: impl backoff::Op<T, DownloadError>,
description: &str, description: &str,
cancel: &CancellationToken, cancel: &CancellationToken,
) -> Result<T, DownloadError> ) -> Result<T, DownloadError> {
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry( backoff::retry(
op, op,
DownloadError::is_permanent, DownloadError::is_permanent,

View File

@@ -506,7 +506,7 @@ impl<'a> TenantDownloader<'a> {
} }
} }
async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> { async fn download(&self, ctx: &mut RequestContext) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_id(); debug_assert_current_span_has_tenant_id();
// For the duration of a download, we must hold the SecondaryTenant::gate, to ensure // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
@@ -831,7 +831,7 @@ impl<'a> TenantDownloader<'a> {
&self, &self,
timeline: HeatMapTimeline, timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline, timeline_state: SecondaryDetailTimeline,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), UpdateError> { ) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
@@ -978,7 +978,7 @@ impl<'a> TenantDownloader<'a> {
tenant_shard_id: &TenantShardId, tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
layer: HeatMapLayer, layer: HeatMapLayer,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<HeatMapLayer>, UpdateError> { ) -> Result<Option<HeatMapLayer>, UpdateError> {
// Failpoint for simulating slow remote storage // Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!( failpoint_support::sleep_millis_async!(

View File

@@ -148,7 +148,7 @@ pub(super) async fn gather_inputs(
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<ModelInputs, CalculateSyntheticSizeError> { ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff // refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant.refresh_gc_info(cancel, ctx).await?; tenant.refresh_gc_info(cancel, ctx).await?;
@@ -379,7 +379,7 @@ async fn fill_logical_sizes(
limit: &Arc<Semaphore>, limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CalculateSyntheticSizeError> { ) -> Result<(), CalculateSyntheticSizeError> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter( let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines timelines

View File

@@ -425,7 +425,7 @@ impl ReadableLayer {
keyspace: KeySpace, keyspace: KeySpace,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
match self { match self {
ReadableLayer::PersistentLayer(layer) => { ReadableLayer::PersistentLayer(layer) => {
@@ -574,7 +574,7 @@ impl LayerAccessStats {
}); });
} }
fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) { fn record_access(&self, access_kind: LayerAccessKind, ctx: &mut RequestContext) {
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip { if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
return; return;
} }

View File

@@ -249,7 +249,7 @@ impl AsLayerDesc for DeltaLayer {
} }
impl DeltaLayer { impl DeltaLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
self.desc.dump(); self.desc.dump();
if !verbose { if !verbose {
@@ -292,7 +292,7 @@ impl DeltaLayer {
async fn load( async fn load(
&self, &self,
access_kind: LayerAccessKind, access_kind: LayerAccessKind,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<&Arc<DeltaLayerInner>> { ) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(access_kind, ctx); self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded // Quick exit if already loaded
@@ -302,7 +302,7 @@ impl DeltaLayer {
.with_context(|| format!("Failed to load delta layer {}", self.path())) .with_context(|| format!("Failed to load delta layer {}", self.path()))
} }
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> { async fn load_inner(&self, ctx: &mut RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path(); let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, None, ctx) let loaded = DeltaLayerInner::load(&path, None, None, ctx)
@@ -393,7 +393,7 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
key_start: Key, key_start: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know // Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will // the end key yet, so we cannot form the final filename yet. We will
@@ -435,7 +435,7 @@ impl DeltaLayerWriterInner {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
val: Value, val: Value,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let (_, res) = self let (_, res) = self
.put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx) .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
@@ -449,7 +449,7 @@ impl DeltaLayerWriterInner {
lsn: Lsn, lsn: Lsn,
val: Vec<u8>, val: Vec<u8>,
will_init: bool, will_init: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) { ) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn); assert!(self.lsn_range.start <= lsn);
let (val, res) = self.blob_writer.write_blob(val, ctx).await; let (val, res) = self.blob_writer.write_blob(val, ctx).await;
@@ -476,7 +476,7 @@ impl DeltaLayerWriterInner {
self, self,
key_end: Key, key_end: Key,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> { ) -> anyhow::Result<ResidentLayer> {
let temp_path = self.path.clone(); let temp_path = self.path.clone();
let result = self.finish0(key_end, timeline, ctx).await; let result = self.finish0(key_end, timeline, ctx).await;
@@ -493,7 +493,7 @@ impl DeltaLayerWriterInner {
self, self,
key_end: Key, key_end: Key,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> { ) -> anyhow::Result<ResidentLayer> {
let index_start_blk = let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -603,7 +603,7 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
key_start: Key, key_start: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
Ok(Self { Ok(Self {
inner: Some( inner: Some(
@@ -630,7 +630,7 @@ impl DeltaLayerWriter {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
val: Value, val: Value,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.inner self.inner
.as_mut() .as_mut()
@@ -645,7 +645,7 @@ impl DeltaLayerWriter {
lsn: Lsn, lsn: Lsn,
val: Vec<u8>, val: Vec<u8>,
will_init: bool, will_init: bool,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) { ) -> (Vec<u8>, anyhow::Result<()>) {
self.inner self.inner
.as_mut() .as_mut()
@@ -665,7 +665,7 @@ impl DeltaLayerWriter {
mut self, mut self,
key_end: Key, key_end: Key,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> { ) -> anyhow::Result<ResidentLayer> {
self.inner self.inner
.take() .take()
@@ -704,7 +704,7 @@ impl DeltaLayer {
pub async fn rewrite_summary<F>( pub async fn rewrite_summary<F>(
path: &Utf8Path, path: &Utf8Path,
rewrite: F, rewrite: F,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), RewriteSummaryError> ) -> Result<(), RewriteSummaryError>
where where
F: Fn(Summary) -> Summary, F: Fn(Summary) -> Summary,
@@ -744,7 +744,7 @@ impl DeltaLayerInner {
path: &Utf8Path, path: &Utf8Path,
summary: Option<Summary>, summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>, max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> { ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await { let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file, Ok(file) => file,
@@ -793,7 +793,7 @@ impl DeltaLayerInner {
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState, reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true; let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`. // Scan the page versions backwards, starting from `lsn`.
@@ -824,13 +824,13 @@ impl DeltaLayerInner {
!blob_ref.will_init() !blob_ref.will_init()
}, },
&RequestContextBuilder::extend(ctx) &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode) .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(), .build(),
) )
.await?; .await?;
let ctx = &RequestContextBuilder::extend(ctx) let ctx = &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue) .page_content_kind(PageContentKind::DeltaLayerValue)
.build(); .build();
@@ -889,7 +889,7 @@ impl DeltaLayerInner {
keyspace: KeySpace, keyspace: KeySpace,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -931,7 +931,7 @@ impl DeltaLayerInner {
#[cfg(test)] #[cfg(test)]
pub(super) async fn load_key_values( pub(super) async fn load_key_values(
&self, &self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> { ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -973,12 +973,12 @@ impl DeltaLayerInner {
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>, index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner, mut planner: VectoredReadPlanner,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> ) -> anyhow::Result<Vec<VectoredRead>>
where where
Reader: BlockReader, Reader: BlockReader,
{ {
let ctx = RequestContextBuilder::extend(ctx) let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode) .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(); .build();
@@ -986,7 +986,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false; let mut range_end_handled = false;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx); let index_stream = index_reader.get_stream_from(&start_key.0, &mut ctx);
let mut index_stream = std::pin::pin!(index_stream); let mut index_stream = std::pin::pin!(index_stream);
while let Some(index_entry) = index_stream.next().await { while let Some(index_entry) = index_stream.next().await {
@@ -1062,7 +1062,7 @@ impl DeltaLayerInner {
&self, &self,
reads: Vec<VectoredRead>, reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file); let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None; let mut ignore_key_with_err = None;
@@ -1140,7 +1140,7 @@ impl DeltaLayerInner {
pub(super) async fn load_keys<'a>( pub(super) async fn load_keys<'a>(
&'a self, &'a self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<DeltaEntry<'a>>> { ) -> Result<Vec<DeltaEntry<'a>>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
@@ -1179,7 +1179,7 @@ impl DeltaLayerInner {
all_keys.push(entry); all_keys.push(entry);
true true
}, },
&RequestContextBuilder::extend(ctx) &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode) .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(), .build(),
) )
@@ -1199,7 +1199,7 @@ impl DeltaLayerInner {
&self, &self,
writer: &mut DeltaLayerWriter, writer: &mut DeltaLayerWriter,
until: Lsn, until: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{ use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended, BlobMeta, VectoredReadBuilder, VectoredReadExtended,
@@ -1387,7 +1387,7 @@ impl DeltaLayerInner {
Ok(records) Ok(records)
} }
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> { pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> {
println!( println!(
"index_start_blk: {}, root {}", "index_start_blk: {}, root {}",
self.index_start_blk, self.index_root_blk self.index_start_blk, self.index_root_blk
@@ -1404,7 +1404,7 @@ impl DeltaLayerInner {
let keys = self.load_keys(ctx).await?; let keys = self.load_keys(ctx).await?;
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> { async fn dump_blob(val: &ValueRef<'_>, ctx: &mut RequestContext) -> anyhow::Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?; let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?; let val = Value::des(&buf)?;
let desc = match val { let desc = match val {
@@ -1513,7 +1513,7 @@ pub struct ValueRef<'a> {
impl<'a> ValueRef<'a> { impl<'a> ValueRef<'a> {
/// Loads the value from disk /// Loads the value from disk
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> { pub async fn load(&self, ctx: &mut RequestContext) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter // theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?; let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?; let val = Value::des(&buf)?;
@@ -1527,7 +1527,7 @@ impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) async fn read_blk( pub(crate) async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BlockLease, std::io::Error> { ) -> Result<BlockLease, std::io::Error> {
let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id); let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
block_reader.read_blk(blknum, ctx).await block_reader.read_blk(blknum, ctx).await
@@ -2060,7 +2060,7 @@ mod test {
source: &DeltaLayerInner, source: &DeltaLayerInner,
truncated: &DeltaLayerInner, truncated: &DeltaLayerInner,
truncated_at: Lsn, truncated_at: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
use futures::future::ready; use futures::future::ready;
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;

View File

@@ -177,7 +177,7 @@ impl std::fmt::Debug for ImageLayerInner {
} }
impl ImageLayerInner { impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> { pub(super) async fn dump(&self, ctx: &mut RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new( let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk, self.index_start_blk,
@@ -217,7 +217,7 @@ impl AsLayerDesc for ImageLayer {
} }
impl ImageLayer { impl ImageLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
self.desc.dump(); self.desc.dump();
if !verbose { if !verbose {
@@ -254,7 +254,7 @@ impl ImageLayer {
async fn load( async fn load(
&self, &self,
access_kind: LayerAccessKind, access_kind: LayerAccessKind,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<&ImageLayerInner> { ) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx); self.access_stats.record_access(access_kind, ctx);
self.inner self.inner
@@ -263,7 +263,7 @@ impl ImageLayer {
.with_context(|| format!("Failed to load image layer {}", self.path())) .with_context(|| format!("Failed to load image layer {}", self.path()))
} }
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> { async fn load_inner(&self, ctx: &mut RequestContext) -> Result<ImageLayerInner> {
let path = self.path(); let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx) let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
@@ -336,7 +336,7 @@ impl ImageLayer {
pub async fn rewrite_summary<F>( pub async fn rewrite_summary<F>(
path: &Utf8Path, path: &Utf8Path,
rewrite: F, rewrite: F,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), RewriteSummaryError> ) -> Result<(), RewriteSummaryError>
where where
F: Fn(Summary) -> Summary, F: Fn(Summary) -> Summary,
@@ -377,7 +377,7 @@ impl ImageLayerInner {
lsn: Lsn, lsn: Lsn,
summary: Option<Summary>, summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>, max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> { ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await { let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file, Ok(file) => file,
@@ -428,7 +428,7 @@ impl ImageLayerInner {
&self, &self,
key: Key, key: Key,
reconstruct_state: &mut ValueReconstructState, reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = let tree_reader =
@@ -439,7 +439,7 @@ impl ImageLayerInner {
if let Some(offset) = tree_reader if let Some(offset) = tree_reader
.get( .get(
&keybuf, &keybuf,
&RequestContextBuilder::extend(ctx) &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode) .page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(), .build(),
) )
@@ -449,7 +449,7 @@ impl ImageLayerInner {
.block_cursor() .block_cursor()
.read_blob( .read_blob(
offset, offset,
&RequestContextBuilder::extend(ctx) &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue) .page_content_kind(PageContentKind::ImageLayerValue)
.build(), .build(),
) )
@@ -470,7 +470,7 @@ impl ImageLayerInner {
&self, &self,
keyspace: KeySpace, keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
let reads = self let reads = self
.plan_reads(keyspace, None, ctx) .plan_reads(keyspace, None, ctx)
@@ -489,7 +489,7 @@ impl ImageLayerInner {
#[cfg(test)] #[cfg(test)]
pub(super) async fn load_key_values( pub(super) async fn load_key_values(
&self, &self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> { ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id); let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = let tree_reader =
@@ -522,7 +522,7 @@ impl ImageLayerInner {
&self, &self,
keyspace: KeySpace, keyspace: KeySpace,
shard_identity: Option<&ShardIdentity>, shard_identity: Option<&ShardIdentity>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> { ) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new( let mut planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes self.max_vectored_read_bytes
@@ -535,7 +535,7 @@ impl ImageLayerInner {
let tree_reader = let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let ctx = RequestContextBuilder::extend(ctx) let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode) .page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(); .build();
@@ -544,7 +544,7 @@ impl ImageLayerInner {
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key); range.start.write_to_byte_slice(&mut search_key);
let index_stream = tree_reader.get_stream_from(&search_key, &ctx); let index_stream = tree_reader.get_stream_from(&search_key, &mut ctx);
let mut index_stream = std::pin::pin!(index_stream); let mut index_stream = std::pin::pin!(index_stream);
while let Some(index_entry) = index_stream.next().await { while let Some(index_entry) = index_stream.next().await {
@@ -587,7 +587,7 @@ impl ImageLayerInner {
&self, &self,
shard_identity: &ShardIdentity, shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter, writer: &mut ImageLayerWriter,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
// Fragment the range into the regions owned by this ShardIdentity // Fragment the range into the regions owned by this ShardIdentity
let plan = self let plan = self
@@ -629,7 +629,7 @@ impl ImageLayerInner {
&self, &self,
reads: Vec<VectoredRead>, reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
let max_vectored_read_bytes = self let max_vectored_read_bytes = self
.max_vectored_read_bytes .max_vectored_read_bytes
@@ -724,7 +724,7 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. // Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done. // We'll atomically rename it to the final name when we're done.
@@ -779,7 +779,7 @@ impl ImageLayerWriterInner {
&mut self, &mut self,
key: Key, key: Key,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key)); ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, ctx).await; let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
@@ -799,7 +799,7 @@ impl ImageLayerWriterInner {
async fn finish( async fn finish(
self, self,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ResidentLayer> { ) -> anyhow::Result<ResidentLayer> {
let index_start_blk = let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -899,7 +899,7 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ImageLayerWriter> { ) -> anyhow::Result<ImageLayerWriter> {
Ok(Self { Ok(Self {
inner: Some( inner: Some(
@@ -918,7 +918,7 @@ impl ImageLayerWriter {
&mut self, &mut self,
key: Key, key: Key,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await self.inner.as_mut().unwrap().put_image(key, img, ctx).await
} }
@@ -929,7 +929,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish( pub(crate) async fn finish(
mut self, mut self,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<super::ResidentLayer> { ) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx).await self.inner.take().unwrap().finish(timeline, ctx).await
} }

View File

@@ -256,7 +256,7 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer /// debugging function to print out the contents of the layer
/// ///
/// this is likely completly unused /// this is likely completly unused
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { pub async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> Result<()> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
let end_str = self.end_lsn_or_max(); let end_str = self.end_lsn_or_max();
@@ -308,12 +308,12 @@ impl InMemoryLayer {
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState, reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn); ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true; let mut need_image = true;
let ctx = RequestContextBuilder::extend(ctx) let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer) .page_content_kind(PageContentKind::InMemoryLayer)
.build(); .build();
@@ -325,7 +325,7 @@ impl InMemoryLayer {
if let Some(vec_map) = inner.index.get(&key) { if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range); let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() { for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos, &ctx).await?; let buf = reader.read_blob(*pos, &mut ctx).await?;
let value = Value::des(&buf)?; let value = Value::des(&buf)?;
match value { match value {
Value::Image(img) => { Value::Image(img) => {
@@ -365,9 +365,9 @@ impl InMemoryLayer {
keyspace: KeySpace, keyspace: KeySpace,
end_lsn: Lsn, end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx) let mut ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer) .page_content_kind(PageContentKind::InMemoryLayer)
.build(); .build();
@@ -410,7 +410,7 @@ impl InMemoryLayer {
continue; continue;
} }
let buf = reader.read_blob(block_read.block_offset, &ctx).await; let buf = reader.read_blob(block_read.block_offset, &mut ctx).await;
if let Err(e) = buf { if let Err(e) = buf {
reconstruct_state reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
@@ -473,7 +473,7 @@ impl InMemoryLayer {
timeline_id: TimelineId, timeline_id: TimelineId,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
start_lsn: Lsn, start_lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<InMemoryLayer> { ) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
@@ -512,7 +512,7 @@ impl InMemoryLayer {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
buf: &[u8], buf: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
self.assert_writable(); self.assert_writable();
@@ -525,7 +525,7 @@ impl InMemoryLayer {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
buf: &[u8], buf: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
@@ -534,7 +534,7 @@ impl InMemoryLayer {
.file .file
.write_blob( .write_blob(
buf, buf,
&RequestContextBuilder::extend(ctx) &mut RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer) .page_content_kind(PageContentKind::InMemoryLayer)
.build(), .build(),
) )
@@ -606,7 +606,7 @@ impl InMemoryLayer {
pub(crate) async fn write_to_disk( pub(crate) async fn write_to_disk(
&self, &self,
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
key_range: Option<Range<Key>>, key_range: Option<Range<Key>>,
) -> Result<Option<ResidentLayer>> { ) -> Result<Option<ResidentLayer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this // Grab the lock in read-mode. We hold it over the I/O, but because this

View File

@@ -331,7 +331,7 @@ impl Layer {
key: Key, key: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState, reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure; use anyhow::ensure;
@@ -361,7 +361,7 @@ impl Layer {
keyspace: KeySpace, keyspace: KeySpace,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_data: &mut ValuesReconstructState, reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
let layer = self let layer = self
.0 .0
@@ -392,7 +392,7 @@ impl Layer {
#[cfg(test)] #[cfg(test)]
pub(crate) async fn load_key_values( pub(crate) async fn load_key_values(
&self, &self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> { ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
let layer = self let layer = self
.0 .0
@@ -479,7 +479,7 @@ impl Layer {
/// Traditional debug dumping facility /// Traditional debug dumping facility
#[allow(unused)] #[allow(unused)]
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> { pub(crate) async fn dump(&self, verbose: bool, ctx: &mut RequestContext) -> anyhow::Result<()> {
self.0.desc.dump(); self.0.desc.dump();
if verbose { if verbose {
@@ -898,7 +898,7 @@ impl LayerInner {
async fn get_or_maybe_download( async fn get_or_maybe_download(
self: &Arc<Self>, self: &Arc<Self>,
allow_download: bool, allow_download: bool,
ctx: Option<&RequestContext>, ctx: Option<&mut RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> { ) -> Result<Arc<DownloadedLayer>, DownloadError> {
let (weak, permit) = { let (weak, permit) = {
// get_or_init_detached can: // get_or_init_detached can:
@@ -988,7 +988,7 @@ impl LayerInner {
return Err(DownloadError::NotFile(ft)); return Err(DownloadError::NotFile(ft));
} }
if let Some(ctx) = ctx { if let Some(ref ctx) = ctx {
self.check_expected_download(ctx)?; self.check_expected_download(ctx)?;
} }
@@ -1049,7 +1049,7 @@ impl LayerInner {
self: &Arc<Self>, self: &Arc<Self>,
timeline: Arc<Timeline>, timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit, permit: heavier_once_cell::InitPermit,
ctx: RequestContext, mut ctx: RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> { ) -> Result<Arc<DownloadedLayer>, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1079,7 +1079,7 @@ impl LayerInner {
.await .await
.unwrap(); .unwrap();
let res = this.download_and_init(timeline, permit, &ctx).await; let res = this.download_and_init(timeline, permit, &mut ctx).await;
if let Err(res) = tx.send(res) { if let Err(res) = tx.send(res) {
match res { match res {
@@ -1122,7 +1122,7 @@ impl LayerInner {
self: &Arc<LayerInner>, self: &Arc<LayerInner>,
timeline: Arc<Timeline>, timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit, permit: heavier_once_cell::InitPermit,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<DownloadedLayer>> { ) -> anyhow::Result<Arc<DownloadedLayer>> {
let result = timeline let result = timeline
.remote_client .remote_client
@@ -1662,9 +1662,9 @@ impl DownloadedLayer {
async fn get<'a>( async fn get<'a>(
&'a self, &'a self,
owner: &Arc<LayerInner>, owner: &Arc<LayerInner>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<&'a LayerKind> { ) -> anyhow::Result<&'a LayerKind> {
let init = || async { let init = async {
assert_eq!( assert_eq!(
Weak::as_ptr(&self.owner), Weak::as_ptr(&self.owner),
Arc::as_ptr(owner), Arc::as_ptr(owner),
@@ -1719,7 +1719,7 @@ impl DownloadedLayer {
} }
}; };
self.kind self.kind
.get_or_try_init(init) .get_or_try_init(move || init)
// return transient errors using `?` // return transient errors using `?`
.await? .await?
.as_ref() .as_ref()
@@ -1736,7 +1736,7 @@ impl DownloadedLayer {
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState, reconstruct_data: &mut ValueReconstructState,
owner: &Arc<LayerInner>, owner: &Arc<LayerInner>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<ValueReconstructResult> { ) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*; use LayerKind::*;
@@ -1758,7 +1758,7 @@ impl DownloadedLayer {
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
reconstruct_data: &mut ValuesReconstructState, reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>, owner: &Arc<LayerInner>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
use LayerKind::*; use LayerKind::*;
@@ -1778,7 +1778,7 @@ impl DownloadedLayer {
async fn load_key_values( async fn load_key_values(
&self, &self,
owner: &Arc<LayerInner>, owner: &Arc<LayerInner>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> { ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
use LayerKind::*; use LayerKind::*;
@@ -1788,7 +1788,7 @@ impl DownloadedLayer {
} }
} }
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> { async fn dump(&self, owner: &Arc<LayerInner>, ctx: &mut RequestContext) -> anyhow::Result<()> {
use LayerKind::*; use LayerKind::*;
match self.get(owner, ctx).await? { match self.get(owner, ctx).await? {
Delta(d) => d.dump(ctx).await?, Delta(d) => d.dump(ctx).await?,
@@ -1837,7 +1837,7 @@ impl ResidentLayer {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))] #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn load_keys<'a>( pub(crate) async fn load_keys<'a>(
&'a self, &'a self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<DeltaEntry<'a>>> { ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
use LayerKind::*; use LayerKind::*;
@@ -1866,7 +1866,7 @@ impl ResidentLayer {
&'a self, &'a self,
shard_identity: &ShardIdentity, shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter, writer: &mut ImageLayerWriter,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
use LayerKind::*; use LayerKind::*;
@@ -1881,7 +1881,7 @@ impl ResidentLayer {
&self, &self,
writer: &mut super::delta_layer::DeltaLayerWriter, writer: &mut super::delta_layer::DeltaLayerWriter,
until: Lsn, until: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
use LayerKind::*; use LayerKind::*;
@@ -1907,7 +1907,7 @@ impl ResidentLayer {
#[cfg(test)] #[cfg(test)]
pub(crate) async fn as_delta( pub(crate) async fn as_delta(
&self, &self,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<&delta_layer::DeltaLayerInner> { ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
use LayerKind::*; use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? { match self.downloaded.get(&self.owner.0, ctx).await? {

View File

@@ -73,7 +73,7 @@ static PERMIT_GAUGES: once_cell::sync::Lazy<
/// Cancellation safe. /// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit( pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind, loop_kind: BackgroundLoopKind,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> tokio::sync::SemaphorePermit<'static> { ) -> tokio::sync::SemaphorePermit<'static> {
let _guard = PERMIT_GAUGES[loop_kind].guard(); let _guard = PERMIT_GAUGES[loop_kind].guard();

View File

@@ -157,19 +157,6 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time }; let observation = Observation { wait_time };
self.metric.observe_throttling(&observation); self.metric.observe_throttling(&observation);
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.call(move || {
warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
});
}
}
Some(wait_time) Some(wait_time)
} else { } else {
None None

View File

@@ -871,7 +871,7 @@ impl Timeline {
&self, &self,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() { if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
@@ -882,8 +882,6 @@ impl Timeline {
// page_service. // page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key)); debug_assert!(!self.shard_identity.is_key_disposable(&key));
self.timeline_get_throttle.throttle(ctx, 1).await;
match self.conf.get_impl { match self.conf.get_impl {
GetImpl::Legacy => { GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState { let reconstruct_state = ValueReconstructState {
@@ -946,7 +944,7 @@ impl Timeline {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
mut reconstruct_state: ValueReconstructState, mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Bytes, PageReconstructError> { ) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here. // XXX: structured stats collection for layer eviction here.
trace!( trace!(
@@ -1004,7 +1002,7 @@ impl Timeline {
&self, &self,
keyspace: KeySpace, keyspace: KeySpace,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> { ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() { if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn)); return Err(GetVectoredError::InvalidLsn(lsn));
@@ -1035,12 +1033,7 @@ impl Timeline {
.for_task_kind(ctx.task_kind()) .for_task_kind(ctx.task_kind())
.map(|metric| (metric, Instant::now())); .map(|metric| (metric, Instant::now()));
// start counting after throttle so that throttle time let throttled = None;
// is always less than observation time
let throttled = self
.timeline_get_throttle
.throttle(ctx, key_count as usize)
.await;
let res = match self.conf.get_vectored_impl { let res = match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => { GetVectoredImpl::Sequential => {
@@ -1101,7 +1094,7 @@ impl Timeline {
&self, &self,
keyspace: KeySpace, keyspace: KeySpace,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> { ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() { if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn)); return Err(GetVectoredError::InvalidLsn(lsn));
@@ -1129,13 +1122,7 @@ impl Timeline {
.for_task_kind(ctx.task_kind()) .for_task_kind(ctx.task_kind())
.map(ScanLatencyOngoingRecording::start_recording); .map(ScanLatencyOngoingRecording::start_recording);
// start counting after throttle so that throttle time let throttled = None;
// is always less than observation time
let throttled = self
.timeline_get_throttle
// assume scan = 1 quota for now until we find a better way to process this
.throttle(ctx, 1)
.await;
let vectored_res = self let vectored_res = self
.get_vectored_impl( .get_vectored_impl(
@@ -1158,7 +1145,7 @@ impl Timeline {
&self, &self,
keyspace: KeySpace, keyspace: KeySpace,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> { ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new(); let mut values = BTreeMap::new();
@@ -1217,7 +1204,7 @@ impl Timeline {
keyspace: KeySpace, keyspace: KeySpace,
lsn: Lsn, lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> { ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 { let get_kind = if keyspace.total_raw_size() == 1 {
GetKind::Singular GetKind::Singular
@@ -1274,7 +1261,7 @@ impl Timeline {
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>, vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace, keyspace: KeySpace,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
if keyspace.overlaps(&Key::metadata_key_range()) { if keyspace.overlaps(&Key::metadata_key_range()) {
// skip validation for metadata key range // skip validation for metadata key range
@@ -1444,7 +1431,7 @@ impl Timeline {
&self, &self,
lsn: Lsn, lsn: Lsn,
who_is_waiting: WaitLsnWaiter<'_>, who_is_waiting: WaitLsnWaiter<'_>,
ctx: &RequestContext, /* Prepare for use by cancellation */ ctx: &mut RequestContext, /* Prepare for use by cancellation */
) -> Result<(), WaitLsnError> { ) -> Result<(), WaitLsnError> {
let state = self.current_state(); let state = self.current_state();
if self.cancel.is_cancelled() || matches!(state, TimelineState::Stopping) { if self.cancel.is_cancelled() || matches!(state, TimelineState::Stopping) {
@@ -1540,7 +1527,7 @@ impl Timeline {
&self, &self,
lsn: Lsn, lsn: Lsn,
length: Duration, length: Duration,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<LsnLease> { ) -> anyhow::Result<LsnLease> {
let lease = { let lease = {
let mut gc_info = self.gc_info.write().unwrap(); let mut gc_info = self.gc_info.write().unwrap();
@@ -1713,7 +1700,7 @@ impl Timeline {
self: &Arc<Self>, self: &Arc<Self>,
cancel: &CancellationToken, cancel: &CancellationToken,
flags: EnumSet<CompactFlags>, flags: EnumSet<CompactFlags>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the // most likely the cancellation token is from background task, but in tests it could be the
// request task as well. // request task as well.
@@ -1765,7 +1752,7 @@ impl Timeline {
parent: Arc<crate::tenant::Tenant>, parent: Arc<crate::tenant::Tenant>,
broker_client: BrokerClientChannel, broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>, background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
if self.tenant_shard_id.is_shard_zero() { if self.tenant_shard_id.is_shard_zero() {
// Logical size is only maintained accurately on shard zero. // Logical size is only maintained accurately on shard zero.
@@ -1946,7 +1933,7 @@ impl Timeline {
pub(crate) async fn wait_to_become_active( pub(crate) async fn wait_to_become_active(
&self, &self,
_ctx: &RequestContext, // Prepare for use by cancellation _ctx: &mut RequestContext, // Prepare for use by cancellation
) -> Result<(), TimelineState> { ) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe(); let mut receiver = self.state.subscribe();
loop { loop {
@@ -2448,7 +2435,7 @@ impl Timeline {
/// when the timeline is activated. /// when the timeline is activated.
fn launch_wal_receiver( fn launch_wal_receiver(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestContext, ctx: &mut RequestContext,
broker_client: BrokerClientChannel, broker_client: BrokerClientChannel,
) { ) {
info!( info!(
@@ -2678,7 +2665,7 @@ impl Timeline {
pub(crate) fn get_current_logical_size( pub(crate) fn get_current_logical_size(
self: &Arc<Self>, self: &Arc<Self>,
priority: GetLogicalSizePriority, priority: GetLogicalSizePriority,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> logical_size::CurrentLogicalSize { ) -> logical_size::CurrentLogicalSize {
if !self.tenant_shard_id.is_shard_zero() { if !self.tenant_shard_id.is_shard_zero() {
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
@@ -2745,7 +2732,7 @@ impl Timeline {
current_size current_size
} }
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) { fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &mut RequestContext) {
let Some(initial_part_end) = self.current_logical_size.initial_part_end else { let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
// nothing to do for freshly created timelines; // nothing to do for freshly created timelines;
assert_eq!( assert_eq!(
@@ -2963,7 +2950,7 @@ impl Timeline {
self: &Arc<Self>, self: &Arc<Self>,
lsn: Lsn, lsn: Lsn,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> { ) -> Result<u64, CalculateLogicalSizeError> {
crate::span::debug_assert_current_span_has_tenant_and_timeline_id(); crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
// We should never be calculating logical sizes on shard !=0, because these shards do not have // We should never be calculating logical sizes on shard !=0, because these shards do not have
@@ -3006,7 +2993,7 @@ impl Timeline {
up_to_lsn: Lsn, up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause, cause: LogicalSizeCalculationCause,
_guard: &GateGuard, _guard: &GateGuard,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<u64, CalculateLogicalSizeError> { ) -> Result<u64, CalculateLogicalSizeError> {
info!( info!(
"Calculating logical size for timeline {} at {}", "Calculating logical size for timeline {} at {}",
@@ -3169,7 +3156,7 @@ impl Timeline {
key: Key, key: Key,
request_lsn: Lsn, request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState, reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> { ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
// Start from the current timeline. // Start from the current timeline.
let mut timeline_owned; let mut timeline_owned;
@@ -3370,7 +3357,7 @@ impl Timeline {
mut keyspace: KeySpace, mut keyspace: KeySpace,
request_lsn: Lsn, request_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), GetVectoredError> { ) -> Result<(), GetVectoredError> {
let mut timeline_owned: Arc<Timeline>; let mut timeline_owned: Arc<Timeline>;
let mut timeline = self; let mut timeline = self;
@@ -3477,7 +3464,7 @@ impl Timeline {
mut cont_lsn: Lsn, mut cont_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState, reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> { ) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone(); let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new(); let mut fringe = LayerFringe::new();
@@ -3585,7 +3572,7 @@ impl Timeline {
async fn get_ready_ancestor_timeline( async fn get_ready_ancestor_timeline(
&self, &self,
ancestor: &Arc<Timeline>, ancestor: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Arc<Timeline>, GetReadyAncestorError> { ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
// It's possible that the ancestor timeline isn't active yet, or // It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait // is active but hasn't yet caught up to the branch point. Wait
@@ -3653,7 +3640,7 @@ impl Timeline {
async fn get_layer_for_write( async fn get_layer_for_write(
&self, &self,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> { ) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await; let mut guard = self.layers.write().await;
let layer = guard let layer = guard
@@ -3697,7 +3684,7 @@ impl Timeline {
async fn flush_loop( async fn flush_loop(
self: &Arc<Self>, self: &Arc<Self>,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>, mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
info!("started flush loop"); info!("started flush loop");
loop { loop {
@@ -3856,7 +3843,7 @@ impl Timeline {
async fn flush_frozen_layer( async fn flush_frozen_layer(
self: &Arc<Self>, self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>, frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Lsn, FlushLayerError> { ) -> Result<Lsn, FlushLayerError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4085,7 +4072,7 @@ impl Timeline {
self: &Arc<Self>, self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>, frozen_layer: &Arc<InMemoryLayer>,
key_range: Option<Range<Key>>, key_range: Option<Range<Key>>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> { ) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self); let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer); let frozen_layer = Arc::clone(frozen_layer);
@@ -4142,7 +4129,7 @@ impl Timeline {
lsn: Lsn, lsn: Lsn,
partition_size: u64, partition_size: u64,
flags: EnumSet<CompactFlags>, flags: EnumSet<CompactFlags>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> { ) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else { let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline. // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
@@ -4239,7 +4226,7 @@ impl Timeline {
partition: &KeySpace, partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter, mut image_layer_writer: ImageLayerWriter,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
img_range: Range<Key>, img_range: Range<Key>,
start: Key, start: Key,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> { ) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
@@ -4339,7 +4326,7 @@ impl Timeline {
partition: &KeySpace, partition: &KeySpace,
mut image_layer_writer: ImageLayerWriter, mut image_layer_writer: ImageLayerWriter,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
img_range: Range<Key>, img_range: Range<Key>,
mode: ImageLayerCreationMode, mode: ImageLayerCreationMode,
start: Key, start: Key,
@@ -4423,7 +4410,7 @@ impl Timeline {
partitioning: &KeyPartitioning, partitioning: &KeyPartitioning,
lsn: Lsn, lsn: Lsn,
mode: ImageLayerCreationMode, mode: ImageLayerCreationMode,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> { ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer(); let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new(); let mut image_layers = Vec::new();
@@ -4624,7 +4611,7 @@ impl Timeline {
self: &Arc<Timeline>, self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant, tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options, options: detach_ancestor::Options,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result< ) -> Result<
( (
completion::Completion, completion::Completion,
@@ -4644,7 +4631,7 @@ impl Timeline {
self: &Arc<Timeline>, self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant, tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach, prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> { ) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await detach_ancestor::complete(self, tenant, prepared, ctx).await
} }
@@ -4822,7 +4809,7 @@ impl Timeline {
cutoff_horizon: Lsn, cutoff_horizon: Lsn,
pitr: Duration, pitr: Duration,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<GcCutoffs, PageReconstructError> { ) -> Result<GcCutoffs, PageReconstructError> {
let _timer = self let _timer = self
.metrics .metrics
@@ -5469,7 +5456,7 @@ impl Timeline {
lsn: Lsn, lsn: Lsn,
mut images: Vec<(Key, Bytes)>, mut images: Vec<(Key, Bytes)>,
check_start_lsn: Option<Lsn>, check_start_lsn: Option<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn(); let last_record_lsn = self.get_last_record_lsn();
assert!( assert!(
@@ -5513,7 +5500,7 @@ impl Timeline {
self: &Arc<Timeline>, self: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>, mut deltas: Vec<(Key, Lsn, Value)>,
check_start_lsn: Option<Lsn>, check_start_lsn: Option<Lsn>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn(); let last_record_lsn = self.get_last_record_lsn();
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
@@ -5556,7 +5543,7 @@ impl Timeline {
pub(crate) async fn inspect_image_layers( pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>, self: &Arc<Timeline>,
lsn: Lsn, lsn: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> { ) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new(); let mut all_data = Vec::new();
let guard = self.layers.read().await; let guard = self.layers.read().await;
@@ -5664,7 +5651,7 @@ impl<'a> TimelineWriter<'a> {
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
value: &Value, value: &Value,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Avoid doing allocations for "small" values. // Avoid doing allocations for "small" values.
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases: // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
@@ -5697,7 +5684,7 @@ impl<'a> TimelineWriter<'a> {
&mut self, &mut self,
at: Lsn, at: Lsn,
action: OpenLayerAction, action: OpenLayerAction,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<&Arc<InMemoryLayer>> { ) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action { match action {
OpenLayerAction::Roll => { OpenLayerAction::Roll => {
@@ -5714,7 +5701,7 @@ impl<'a> TimelineWriter<'a> {
Ok(&self.write_guard.as_ref().unwrap().open_layer) Ok(&self.write_guard.as_ref().unwrap().open_layer)
} }
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> { async fn open_layer(&mut self, at: Lsn, ctx: &mut RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?; let layer = self.tl.get_layer_for_write(at, ctx).await?;
let initial_size = layer.size().await?; let initial_size = layer.size().await?;
@@ -5800,7 +5787,7 @@ impl<'a> TimelineWriter<'a> {
pub(crate) async fn put_batch( pub(crate) async fn put_batch(
&mut self, &mut self,
batch: VecMap<Lsn, (Key, Value)>, batch: VecMap<Lsn, (Key, Value)>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
for (lsn, (key, val)) in batch { for (lsn, (key, val)) in batch {
self.put(key, lsn, &val, ctx).await? self.put(key, lsn, &val, ctx).await?
@@ -5812,7 +5799,7 @@ impl<'a> TimelineWriter<'a> {
pub(crate) async fn delete_batch( pub(crate) async fn delete_batch(
&mut self, &mut self,
batch: &[(Range<Key>, Lsn)], batch: &[(Range<Key>, Lsn)],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() { if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0); let action = self.get_open_layer_action(*lsn, 0);

View File

@@ -49,7 +49,7 @@ impl Timeline {
self: &Arc<Self>, self: &Arc<Self>,
_cancel: &CancellationToken, _cancel: &CancellationToken,
flags: EnumSet<CompactFlags>, flags: EnumSet<CompactFlags>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
// High level strategy for compaction / image creation: // High level strategy for compaction / image creation:
// //
@@ -175,7 +175,7 @@ impl Timeline {
async fn compact_shard_ancestors( async fn compact_shard_ancestors(
self: &Arc<Self>, self: &Arc<Self>,
rewrite_max: usize, rewrite_max: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut drop_layers = Vec::new(); let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new(); let mut layers_to_rewrite: Vec<Layer> = Vec::new();
@@ -359,7 +359,7 @@ impl Timeline {
async fn compact_level0( async fn compact_level0(
self: &Arc<Self>, self: &Arc<Self>,
target_file_size: u64, target_file_size: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
let CompactLevel0Phase1Result { let CompactLevel0Phase1Result {
new_layers, new_layers,
@@ -400,7 +400,7 @@ impl Timeline {
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>, guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder, mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64, target_file_size: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> { ) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller stats.read_lock_acquisition_micros.till_now(); // set by caller
@@ -907,7 +907,7 @@ impl Timeline {
pub(crate) async fn compact_tiered( pub(crate) async fn compact_tiered(
self: &Arc<Self>, self: &Arc<Self>,
_cancel: &CancellationToken, _cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
let fanout = self.get_compaction_threshold() as u64; let fanout = self.get_compaction_threshold() as u64;
let target_file_size = self.get_checkpoint_distance(); let target_file_size = self.get_checkpoint_distance();
@@ -963,7 +963,7 @@ impl Timeline {
pub(crate) async fn compact_with_gc( pub(crate) async fn compact_with_gc(
self: &Arc<Self>, self: &Arc<Self>,
_cancel: &CancellationToken, _cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
use crate::tenant::storage_layer::ValueReconstructState; use crate::tenant::storage_layer::ValueReconstructState;
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon. // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
@@ -1190,7 +1190,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self, &mut self,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn_range: &Range<Lsn>, lsn_range: &Range<Lsn>,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> { ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?; self.flush_updates().await?;
@@ -1211,7 +1211,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self, &mut self,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<Vec<Range<Key>>> { ) -> anyhow::Result<Vec<Range<Key>>> {
if lsn == self.keyspace.0 { if lsn == self.keyspace.0 {
Ok(pageserver_compaction::helpers::intersect_keyspace( Ok(pageserver_compaction::helpers::intersect_keyspace(
@@ -1247,7 +1247,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
&mut self, &mut self,
lsn: Lsn, lsn: Lsn,
key_range: &Range<Key>, key_range: &Range<Key>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
Ok(self.create_image_impl(lsn, key_range, ctx).await?) Ok(self.create_image_impl(lsn, key_range, ctx).await?)
} }
@@ -1257,7 +1257,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
lsn_range: &Range<Lsn>, lsn_range: &Range<Lsn>,
key_range: &Range<Key>, key_range: &Range<Key>,
input_layers: &[ResidentDeltaLayer], input_layers: &[ResidentDeltaLayer],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
@@ -1329,7 +1329,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
async fn delete_layer( async fn delete_layer(
&mut self, &mut self,
layer: &OwnArc<PersistentLayerDesc>, layer: &OwnArc<PersistentLayerDesc>,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.layers_to_delete.push(layer.clone().0); self.layers_to_delete.push(layer.clone().0);
Ok(()) Ok(())
@@ -1341,7 +1341,7 @@ impl TimelineAdaptor {
&mut self, &mut self,
lsn: Lsn, lsn: Lsn,
key_range: &Range<Key>, key_range: &Range<Key>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), CreateImageLayersError> { ) -> Result<(), CreateImageLayersError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer(); let timer = self.timeline.metrics.create_images_time_histo.start_timer();
@@ -1468,7 +1468,7 @@ impl CompactionLayer<Key> for ResidentDeltaLayer {
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer { impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>; type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> { async fn load_keys<'a>(&self, ctx: &mut RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
self.0.load_keys(ctx).await self.0.load_keys(ctx).await
} }
} }

View File

@@ -87,7 +87,7 @@ pub(super) async fn prepare(
detached: &Arc<Timeline>, detached: &Arc<Timeline>,
tenant: &Tenant, tenant: &Tenant,
options: Options, options: Options,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> { ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*; use Error::*;
@@ -325,7 +325,7 @@ async fn upload_rewritten_layer(
layer: &Layer, layer: &Layer,
target: &Arc<Timeline>, target: &Arc<Timeline>,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<Layer>, Error> { ) -> Result<Option<Layer>, Error> {
use Error::UploadRewritten; use Error::UploadRewritten;
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?; let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
@@ -348,7 +348,7 @@ async fn copy_lsn_prefix(
end_lsn: Lsn, end_lsn: Lsn,
layer: &Layer, layer: &Layer,
target_timeline: &Arc<Timeline>, target_timeline: &Arc<Timeline>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Option<ResidentLayer>, Error> { ) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed}; use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
@@ -437,7 +437,7 @@ pub(super) async fn complete(
detached: &Arc<Timeline>, detached: &Arc<Timeline>,
tenant: &Tenant, tenant: &Tenant,
prepared: PreparedTimelineDetach, prepared: PreparedTimelineDetach,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> { ) -> Result<Vec<TimelineId>, anyhow::Error> {
let PreparedTimelineDetach { layers } = prepared; let PreparedTimelineDetach { layers } = prepared;

View File

@@ -127,7 +127,7 @@ impl Timeline {
policy: &EvictionPolicy, policy: &EvictionPolicy,
cancel: &CancellationToken, cancel: &CancellationToken,
gate: &GateGuard, gate: &GateGuard,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> ControlFlow<(), Instant> { ) -> ControlFlow<(), Instant> {
debug!("eviction iteration: {policy:?}"); debug!("eviction iteration: {policy:?}");
let start = Instant::now(); let start = Instant::now();
@@ -184,7 +184,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold, p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken, cancel: &CancellationToken,
gate: &GateGuard, gate: &GateGuard,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> ControlFlow<()> { ) -> ControlFlow<()> {
let now = SystemTime::now(); let now = SystemTime::now();
@@ -309,7 +309,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold, p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken, cancel: &CancellationToken,
gate: &GateGuard, gate: &GateGuard,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> ControlFlow<()> { ) -> ControlFlow<()> {
let permit = self.acquire_imitation_permit(cancel, ctx).await?; let permit = self.acquire_imitation_permit(cancel, ctx).await?;
@@ -320,7 +320,7 @@ impl Timeline {
async fn acquire_imitation_permit( async fn acquire_imitation_permit(
&self, &self,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> { ) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction, BackgroundLoopKind::Eviction,
@@ -366,7 +366,7 @@ impl Timeline {
cancel: &CancellationToken, cancel: &CancellationToken,
gate: &GateGuard, gate: &GateGuard,
permit: tokio::sync::SemaphorePermit<'static>, permit: tokio::sync::SemaphorePermit<'static>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> ControlFlow<()> { ) -> ControlFlow<()> {
if !self.tenant_shard_id.is_shard_zero() { if !self.tenant_shard_id.is_shard_zero() {
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
@@ -442,7 +442,7 @@ impl Timeline {
async fn imitate_timeline_cached_layer_accesses( async fn imitate_timeline_cached_layer_accesses(
&self, &self,
guard: &GateGuard, guard: &GateGuard,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
let lsn = self.get_last_record_lsn(); let lsn = self.get_last_record_lsn();
@@ -499,7 +499,7 @@ impl Timeline {
&self, &self,
tenant: &Tenant, tenant: &Tenant,
cancel: &CancellationToken, cancel: &CancellationToken,
ctx: &RequestContext, ctx: &mut RequestContext,
) { ) {
if self.conf.metric_collection_endpoint.is_none() { if self.conf.metric_collection_endpoint.is_none() {
// We don't start the consumption metrics task if this is not set in the config. // We don't start the consumption metrics task if this is not set in the config.

View File

@@ -73,7 +73,7 @@ impl LayerManager {
conf: &'static PageServerConf, conf: &'static PageServerConf,
timeline_id: TimelineId, timeline_id: TimelineId,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Arc<InMemoryLayer>> { ) -> Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned()); ensure!(lsn.is_aligned());

View File

@@ -90,7 +90,7 @@ impl<'t> UninitializedTimeline<'t> {
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn, base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel, broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?; let raw_timeline = self.raw_timeline()?;

View File

@@ -68,7 +68,7 @@ impl WalReceiver {
timeline: Arc<Timeline>, timeline: Arc<Timeline>,
conf: WalReceiverConf, conf: WalReceiverConf,
mut broker_client: BrokerClientChannel, mut broker_client: BrokerClientChannel,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Self { ) -> Self {
let tenant_shard_id = timeline.tenant_shard_id; let tenant_shard_id = timeline.tenant_shard_id;
let timeline_id = timeline.timeline_id; let timeline_id = timeline.timeline_id;

View File

@@ -59,7 +59,7 @@ pub(crate) struct Cancelled;
pub(super) async fn connection_manager_loop_step( pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel, broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState, connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext, ctx: &mut RequestContext,
cancel: &CancellationToken, cancel: &CancellationToken,
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>, manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
) -> Result<(), Cancelled> { ) -> Result<(), Cancelled> {
@@ -523,7 +523,11 @@ impl ConnectionManagerState {
} }
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string. /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) { async fn change_connection(
&mut self,
new_sk: NewWalConnectionCandidate,
ctx: &mut RequestContext,
) {
WALRECEIVER_SWITCHES WALRECEIVER_SWITCHES
.with_label_values(&[new_sk.reason.name()]) .with_label_values(&[new_sk.reason.name()])
.inc(); .inc();

View File

@@ -286,7 +286,7 @@ impl<'a> VectoredBlobReader<'a> {
&self, &self,
read: &VectoredRead, read: &VectoredRead,
buf: BytesMut, buf: BytesMut,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> { ) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0); assert!(read.size() > 0);
assert!( assert!(

View File

@@ -346,7 +346,7 @@ impl VirtualFile {
/// Open a file in read-only mode. Like File::open. /// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>( pub async fn open<P: AsRef<Utf8Path>>(
path: P, path: P,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<VirtualFile, std::io::Error> { ) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
} }
@@ -355,7 +355,7 @@ impl VirtualFile {
/// Like File::create. /// Like File::create.
pub async fn create<P: AsRef<Utf8Path>>( pub async fn create<P: AsRef<Utf8Path>>(
path: P, path: P,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<VirtualFile, std::io::Error> { ) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options( Self::open_with_options(
path.as_ref(), path.as_ref(),
@@ -373,7 +373,7 @@ impl VirtualFile {
pub async fn open_with_options<P: AsRef<Utf8Path>>( pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P, path: P,
open_options: &OpenOptions, open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ _ctx: &mut RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> { ) -> Result<VirtualFile, std::io::Error> {
let path_ref = path.as_ref(); let path_ref = path.as_ref();
let path_str = path_ref.to_string(); let path_str = path_ref.to_string();
@@ -589,15 +589,13 @@ impl VirtualFile {
&self, &self,
buf: B, buf: B,
offset: u64, offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<B, Error> ) -> Result<B, Error>
where where
B: IoBufMut + Send, B: IoBufMut + Send,
{ {
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| { let (buf, res) =
self.read_at(buf, offset, ctx) read_exact_at_impl(buf, offset, None, VirtualFileReadAt { file: self, ctx }).await;
})
.await;
res.map(|()| buf) res.map(|()| buf)
} }
@@ -606,14 +604,17 @@ impl VirtualFile {
buf: B, buf: B,
offset: u64, offset: u64,
count: usize, count: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<B, Error> ) -> Result<B, Error>
where where
B: IoBufMut + Send, B: IoBufMut + Send,
{ {
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| { let (buf, res) = read_exact_at_impl(
self.read_at(buf, offset, ctx) buf,
}) offset,
Some(count),
VirtualFileReadAt { file: self, ctx },
)
.await; .await;
res.map(|()| buf) res.map(|()| buf)
} }
@@ -623,7 +624,7 @@ impl VirtualFile {
&self, &self,
page: PageWriteGuard<'static>, page: PageWriteGuard<'static>,
offset: u64, offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<PageWriteGuard<'static>, Error> { ) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf { let buf = PageWriteGuardBuf {
page, page,
@@ -639,7 +640,7 @@ impl VirtualFile {
&self, &self,
buf: B, buf: B,
mut offset: u64, mut offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (B::Buf, Result<(), Error>) { ) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init(); let buf_len = buf.bytes_init();
if buf_len == 0 { if buf_len == 0 {
@@ -677,7 +678,7 @@ impl VirtualFile {
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (B::Buf, Result<usize, Error>) { ) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init(); let nbytes = buf.bytes_init();
if nbytes == 0 { if nbytes == 0 {
@@ -710,7 +711,7 @@ impl VirtualFile {
async fn write<B: IoBuf + Send>( async fn write<B: IoBuf + Send>(
&mut self, &mut self,
buf: Slice<B>, buf: Slice<B>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> (Slice<B>, Result<usize, std::io::Error>) { ) -> (Slice<B>, Result<usize, std::io::Error>) {
let pos = self.pos; let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await; let (buf, res) = self.write_at(buf, pos, ctx).await;
@@ -726,7 +727,7 @@ impl VirtualFile {
&self, &self,
buf: B, buf: B,
offset: u64, offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ _ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (B, Result<usize, Error>) ) -> (B, Result<usize, Error>)
where where
B: tokio_epoll_uring::BoundedBufMut + Send, B: tokio_epoll_uring::BoundedBufMut + Send,
@@ -756,7 +757,7 @@ impl VirtualFile {
&self, &self,
buf: Slice<B>, buf: Slice<B>,
offset: u64, offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ _ctx: &mut RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (Slice<B>, Result<usize, Error>) { ) -> (Slice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await { let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard, Ok(file_guard) => file_guard,
@@ -780,8 +781,35 @@ impl VirtualFile {
} }
} }
trait AsyncClosureThatDoesReadAt {
async fn call<B>(
&mut self,
buf: tokio_epoll_uring::Slice<B>,
offset: u64,
) -> (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)
where
B: IoBufMut + Send;
}
struct VirtualFileReadAt<'a> {
file: &'a VirtualFile,
ctx: &'a mut RequestContext,
}
impl<'a> AsyncClosureThatDoesReadAt for VirtualFileReadAt<'a> {
async fn call<B>(
&mut self,
buf: tokio_epoll_uring::Slice<B>,
offset: u64,
) -> (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)
where
B: IoBufMut + Send,
{
self.file.read_at(buf, offset, self.ctx).await
}
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_impl<B, F, Fut>( pub async fn read_exact_at_impl<B, F>(
buf: B, buf: B,
mut offset: u64, mut offset: u64,
count: Option<usize>, count: Option<usize>,
@@ -789,8 +817,7 @@ pub async fn read_exact_at_impl<B, F, Fut>(
) -> (B, std::io::Result<()>) ) -> (B, std::io::Result<()>)
where where
B: IoBufMut + Send, B: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut, F: AsyncClosureThatDoesReadAt,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{ {
let mut buf: tokio_epoll_uring::Slice<B> = match count { let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => { Some(count) => {
@@ -802,8 +829,8 @@ where
}; };
while buf.bytes_total() != 0 { while buf.bytes_total() != 0 {
let res; let res: std::io::Result<usize>;
(buf, res) = read_at(buf, offset).await; (buf, res) = read_at.call(buf, offset).await;
match res { match res {
Ok(0) => break, Ok(0) => break,
Ok(n) => { Ok(n) => {
@@ -1048,7 +1075,7 @@ impl VirtualFile {
pub(crate) async fn read_blk( pub(crate) async fn read_blk(
&self, &self,
blknum: u32, blknum: u32,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> { ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ; use crate::page_cache::PAGE_SZ;
let buf = vec![0; PAGE_SZ]; let buf = vec![0; PAGE_SZ];
@@ -1058,7 +1085,11 @@ impl VirtualFile {
Ok(crate::tenant::block_io::BlockLease::Vec(buf)) Ok(crate::tenant::block_io::BlockLease::Vec(buf))
} }
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> { async fn read_to_end(
&mut self,
buf: &mut Vec<u8>,
ctx: &mut RequestContext,
) -> Result<(), Error> {
let mut tmp = vec![0; 128]; let mut tmp = vec![0; 128];
loop { loop {
let res; let res;
@@ -1122,7 +1153,7 @@ impl OwnedAsyncWriter for VirtualFile {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> { ) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
res.map(move |v| (v, buf)) res.map(move |v| (v, buf))
@@ -1208,7 +1239,7 @@ mod tests {
&self, &self,
mut buf: Vec<u8>, mut buf: Vec<u8>,
offset: u64, offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<Vec<u8>, Error> { ) -> Result<Vec<u8>, Error> {
match self { match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await, MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
@@ -1219,7 +1250,7 @@ mod tests {
&self, &self,
buf: B, buf: B,
offset: u64, offset: u64,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), Error> { ) -> Result<(), Error> {
match self { match self {
MaybeVirtualFile::VirtualFile(file) => { MaybeVirtualFile::VirtualFile(file) => {
@@ -1244,7 +1275,7 @@ mod tests {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), Error> { ) -> Result<(), Error> {
match self { match self {
MaybeVirtualFile::VirtualFile(file) => { MaybeVirtualFile::VirtualFile(file) => {
@@ -1263,7 +1294,7 @@ mod tests {
// Helper function to slurp contents of a file, starting at the current position, // Helper function to slurp contents of a file, starting at the current position,
// into a string // into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> { async fn read_string(&mut self, ctx: &mut RequestContext) -> Result<String, Error> {
use std::io::Read; use std::io::Read;
let mut buf = String::new(); let mut buf = String::new();
match self { match self {
@@ -1284,7 +1315,7 @@ mod tests {
&mut self, &mut self,
pos: u64, pos: u64,
len: usize, len: usize,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<String, Error> { ) -> Result<String, Error> {
let buf = vec![0; len]; let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos, ctx).await?; let buf = self.read_exact_at(buf, pos, ctx).await?;
@@ -1307,7 +1338,7 @@ mod tests {
async fn open( async fn open(
path: Utf8PathBuf, path: Utf8PathBuf,
opts: OpenOptions, opts: OpenOptions,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> { ) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?; let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf)) Ok(MaybeVirtualFile::VirtualFile(vf))
@@ -1324,7 +1355,7 @@ mod tests {
async fn open( async fn open(
path: Utf8PathBuf, path: Utf8PathBuf,
opts: OpenOptions, opts: OpenOptions,
_ctx: &RequestContext, _ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> { ) -> Result<MaybeVirtualFile, anyhow::Error> {
Ok(MaybeVirtualFile::File({ Ok(MaybeVirtualFile::File({
let owned_fd = opts.open(path.as_std_path()).await?; let owned_fd = opts.open(path.as_std_path()).await?;
@@ -1343,7 +1374,7 @@ mod tests {
async fn open( async fn open(
path: Utf8PathBuf, path: Utf8PathBuf,
opts: OpenOptions, opts: OpenOptions,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error>; ) -> Result<MaybeVirtualFile, anyhow::Error>;
} }

View File

@@ -38,7 +38,7 @@ where
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> { ) -> std::io::Result<(usize, B::Buf)> {
let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; let (nwritten, buf) = self.dst.write_all(buf, ctx).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap(); self.bytes_amount += u64::try_from(nwritten).unwrap();

View File

@@ -9,7 +9,7 @@ pub trait OwnedAsyncWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)>; ) -> std::io::Result<(usize, B::Buf)>;
} }
@@ -60,7 +60,7 @@ where
} }
#[cfg_attr(target_os = "macos", allow(dead_code))] #[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<W> { pub async fn flush_and_into_inner(mut self, ctx: &mut RequestContext) -> std::io::Result<W> {
self.flush(ctx).await?; self.flush(ctx).await?;
let Self { buf, writer } = self; let Self { buf, writer } = self;
@@ -79,7 +79,7 @@ where
pub async fn write_buffered<S: IoBuf + Send>( pub async fn write_buffered<S: IoBuf + Send>(
&mut self, &mut self,
chunk: Slice<S>, chunk: Slice<S>,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<(usize, S)> { ) -> std::io::Result<(usize, S)> {
let chunk_len = chunk.len(); let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk // avoid memcpy for the middle of the chunk
@@ -124,7 +124,7 @@ where
pub async fn write_buffered_borrowed( pub async fn write_buffered_borrowed(
&mut self, &mut self,
mut chunk: &[u8], mut chunk: &[u8],
ctx: &RequestContext, ctx: &mut RequestContext,
) -> std::io::Result<usize> { ) -> std::io::Result<usize> {
let chunk_len = chunk.len(); let chunk_len = chunk.len();
while !chunk.is_empty() { while !chunk.is_empty() {
@@ -142,7 +142,7 @@ where
Ok(chunk_len) Ok(chunk_len)
} }
async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> { async fn flush(&mut self, ctx: &mut RequestContext) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error"); let buf = self.buf.take().expect("must not use after an error");
let buf_len = buf.pending(); let buf_len = buf.pending();
if buf_len == 0 { if buf_len == 0 {
@@ -215,7 +215,7 @@ impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
_: &RequestContext, _: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> { ) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init(); let nbytes = buf.bytes_init();
if nbytes == 0 { if nbytes == 0 {
@@ -243,7 +243,7 @@ mod tests {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>( async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self, &mut self,
buf: B, buf: B,
_: &RequestContext, _: &mut RequestContext,
) -> std::io::Result<(usize, B::Buf)> { ) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init(); let nbytes = buf.bytes_init();
if nbytes == 0 { if nbytes == 0 {

View File

@@ -59,7 +59,7 @@ impl WalIngest {
pub async fn new( pub async fn new(
timeline: &Timeline, timeline: &Timeline,
startpoint: Lsn, startpoint: Lsn,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<WalIngest> { ) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it // Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes. // quickly in `ingest_record` and update it when it changes.
@@ -90,7 +90,7 @@ impl WalIngest {
lsn: Lsn, lsn: Lsn,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord, decoded: &mut DecodedWALRecord,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
WAL_INGEST.records_received.inc(); WAL_INGEST.records_received.inc();
let pg_version = modification.tline.pg_version; let pg_version = modification.tline.pg_version;
@@ -449,7 +449,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
blk: &DecodedBkpBlock, blk: &DecodedBkpBlock,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
let rel = RelTag { let rel = RelTag {
spcnode: blk.rnode_spcnode, spcnode: blk.rnode_spcnode,
@@ -467,7 +467,7 @@ impl WalIngest {
lsn: Lsn, lsn: Lsn,
decoded: &DecodedWALRecord, decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock, blk: &DecodedBkpBlock,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
let rel = RelTag { let rel = RelTag {
spcnode: blk.rnode_spcnode, spcnode: blk.rnode_spcnode,
@@ -530,7 +530,7 @@ impl WalIngest {
buf: &mut Bytes, buf: &mut Bytes,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
decoded: &DecodedWALRecord, decoded: &DecodedWALRecord,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Handle VM bit updates that are implicitly part of heap records. // Handle VM bit updates that are implicitly part of heap records.
@@ -836,7 +836,7 @@ impl WalIngest {
buf: &mut Bytes, buf: &mut Bytes,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
decoded: &DecodedWALRecord, decoded: &DecodedWALRecord,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Handle VM bit updates that are implicitly part of heap records. // Handle VM bit updates that are implicitly part of heap records.
@@ -1007,7 +1007,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rec: &XlCreateDatabase, rec: &XlCreateDatabase,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let db_id = rec.db_id; let db_id = rec.db_id;
let tablespace_id = rec.tablespace_id; let tablespace_id = rec.tablespace_id;
@@ -1102,7 +1102,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rec: &XlSmgrCreate, rec: &XlSmgrCreate,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let rel = RelTag { let rel = RelTag {
spcnode: rec.rnode.spcnode, spcnode: rec.rnode.spcnode,
@@ -1121,7 +1121,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rec: &XlSmgrTruncate, rec: &XlSmgrTruncate,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let spcnode = rec.rnode.spcnode; let spcnode = rec.rnode.spcnode;
let dbnode = rec.rnode.dbnode; let dbnode = rec.rnode.dbnode;
@@ -1193,7 +1193,7 @@ impl WalIngest {
parsed: &XlXactParsedRecord, parsed: &XlXactParsedRecord,
is_commit: bool, is_commit: bool,
origin_id: u16, origin_id: u16,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Record update of CLOG pages // Record update of CLOG pages
let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
@@ -1270,7 +1270,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
xlrec: &XlClogTruncate, xlrec: &XlClogTruncate,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!( info!(
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}", "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
@@ -1416,7 +1416,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate, xlrec: &XlMultiXactTruncate,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off; self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db; self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
@@ -1454,7 +1454,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
xlrec: &XlRelmapUpdate, xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord, decoded: &DecodedWALRecord,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
let mut buf = decoded.record.clone(); let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset); buf.advance(decoded.main_data_offset);
@@ -1475,7 +1475,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rel: RelTag, rel: RelTag,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
modification.put_rel_creation(rel, 0, ctx).await?; modification.put_rel_creation(rel, 0, ctx).await?;
Ok(()) Ok(())
@@ -1487,7 +1487,7 @@ impl WalIngest {
rel: RelTag, rel: RelTag,
blknum: BlockNumber, blknum: BlockNumber,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
self.handle_rel_extend(modification, rel, blknum, ctx) self.handle_rel_extend(modification, rel, blknum, ctx)
.await?; .await?;
@@ -1501,7 +1501,7 @@ impl WalIngest {
rel: RelTag, rel: RelTag,
blknum: BlockNumber, blknum: BlockNumber,
rec: NeonWalRecord, rec: NeonWalRecord,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
self.handle_rel_extend(modification, rel, blknum, ctx) self.handle_rel_extend(modification, rel, blknum, ctx)
.await?; .await?;
@@ -1514,7 +1514,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rel: RelTag, rel: RelTag,
nblocks: BlockNumber, nblocks: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
modification.put_rel_truncation(rel, nblocks, ctx).await?; modification.put_rel_truncation(rel, nblocks, ctx).await?;
Ok(()) Ok(())
@@ -1524,7 +1524,7 @@ impl WalIngest {
&mut self, &mut self,
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rel: RelTag, rel: RelTag,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
modification.put_rel_drop(rel, ctx).await?; modification.put_rel_drop(rel, ctx).await?;
Ok(()) Ok(())
@@ -1535,7 +1535,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>, modification: &mut DatadirModification<'_>,
rel: RelTag, rel: RelTag,
blknum: BlockNumber, blknum: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<(), PageReconstructError> { ) -> Result<(), PageReconstructError> {
let new_nblocks = blknum + 1; let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first // Check if the relation exists. We implicitly create relations on first
@@ -1597,7 +1597,7 @@ impl WalIngest {
segno: u32, segno: u32,
blknum: BlockNumber, blknum: BlockNumber,
img: Bytes, img: Bytes,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> Result<()> { ) -> Result<()> {
self.handle_slru_extend(modification, kind, segno, blknum, ctx) self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?; .await?;
@@ -1611,7 +1611,7 @@ impl WalIngest {
kind: SlruKind, kind: SlruKind,
segno: u32, segno: u32,
blknum: BlockNumber, blknum: BlockNumber,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// we don't use a cache for this like we do for relations. SLRUS are explcitly // we don't use a cache for this like we do for relations. SLRUS are explcitly
// extended with ZEROPAGE records, not with commit records, so it happens // extended with ZEROPAGE records, not with commit records, so it happens
@@ -1660,7 +1660,7 @@ impl WalIngest {
async fn get_relsize( async fn get_relsize(
modification: &DatadirModification<'_>, modification: &DatadirModification<'_>,
rel: RelTag, rel: RelTag,
ctx: &RequestContext, ctx: &mut RequestContext,
) -> anyhow::Result<BlockNumber> { ) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification let nblocks = if !modification
.tline .tline
@@ -1701,7 +1701,7 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> { async fn init_walingest_test(tline: &Timeline, ctx: &mut RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10)); let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file