diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 4f327eeb68..c6cfe91b9d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -502,7 +502,7 @@ async fn timeline_create_handler( let new_timeline_id = request_data.new_timeline_id; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); let state = get_state(&request); @@ -527,7 +527,7 @@ async fn timeline_create_handler( request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION), request_data.existing_initdb_timeline_id, state.broker_client.clone(), - &ctx, + &mut ctx, ) .await { @@ -535,7 +535,7 @@ async fn timeline_create_handler( // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common( &new_timeline, - &ctx, + &mut ctx, tenant::timeline::GetLogicalSizePriority::User, ) .await @@ -593,7 +593,7 @@ async fn timeline_list_handler( check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let response_data = async { let tenant = state @@ -610,7 +610,7 @@ async fn timeline_list_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), - &ctx, + &mut ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) .await @@ -680,7 +680,7 @@ async fn timeline_detail_handler( check_permission(&request, Some(tenant_shard_id.tenant_id))?; // Logical size calculation needs downloading. - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let state = get_state(&request); let timeline_info = async { @@ -696,7 +696,7 @@ async fn timeline_detail_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), - &ctx, + &mut ctx, ) .await .context("get local timeline info") @@ -735,13 +735,13 @@ async fn get_lsn_by_timestamp_handler( .map_err(ApiError::BadRequest)?; let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; let result = timeline - .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) + .find_lsn_for_timestamp(timestamp_pg, &cancel, &mut ctx) .await?; #[derive(serde::Serialize, Debug)] struct Result { @@ -786,11 +786,11 @@ async fn get_timestamp_of_lsn_handler( .with_context(|| format!("Invalid LSN: {lsn_str:?}")) .map_err(ApiError::BadRequest)?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; + let result = timeline.get_timestamp_for_lsn(lsn, &mut ctx).await?; match result { Some(time) => { @@ -816,7 +816,7 @@ async fn tenant_attach_handler( None => TenantConfOpt::default(), }; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); info!("Handling tenant attach {tenant_id}"); @@ -830,7 +830,7 @@ async fn tenant_attach_handler( let tenant = state .tenant_manager - .upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &ctx) + .upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &mut ctx) .await?; let Some(tenant) = tenant else { @@ -921,11 +921,11 @@ async fn tenant_reset_handler( let drop_cache: Option = parse_query_param(&request, "drop_cache")?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); state .tenant_manager - .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx) + .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &mut ctx) .await .map_err(ApiError::InternalServerError)?; @@ -939,7 +939,7 @@ async fn tenant_load_handler( let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let maybe_body: Option = json_request_or_empty_body(&mut request).await?; @@ -956,7 +956,7 @@ async fn tenant_load_handler( state.broker_client.clone(), state.remote_storage.clone(), state.deletion_queue_client.clone(), - &ctx, + &mut ctx, ) .instrument(info_span!("load", %tenant_id)) .await?; @@ -1120,7 +1120,7 @@ async fn tenant_size_handler( ))); } - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; @@ -1132,7 +1132,7 @@ async fn tenant_size_handler( retention_period, LogicalSizeCalculationCause::TenantSizeHandler, &cancel, - &ctx, + &mut ctx, ) .await .map_err(|e| match e { @@ -1193,7 +1193,7 @@ async fn tenant_shard_split_handler( let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let state = get_state(&request); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let tenant = state .tenant_manager @@ -1206,7 +1206,7 @@ async fn tenant_shard_split_handler( tenant, ShardCount::new(req.new_shard_count), req.new_stripe_size, - &ctx, + &mut ctx, ) .await .map_err(ApiError::InternalServerError)?; @@ -1386,7 +1386,7 @@ async fn tenant_create_handler( let generation = get_request_generation(state, request_data.generation)?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let location_conf = LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters); @@ -1398,7 +1398,7 @@ async fn tenant_create_handler( location_conf, None, SpawnMode::Create, - &ctx, + &mut ctx, ) .await?; @@ -1498,7 +1498,7 @@ async fn put_tenant_location_config_handler( let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false); check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); let conf = state.conf; @@ -1537,7 +1537,7 @@ async fn put_tenant_location_config_handler( let tenant = state .tenant_manager - .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx) + .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &mut ctx) .await?; let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size()); let attached = tenant.is_some(); @@ -1722,7 +1722,7 @@ async fn lsn_lease_handler( let lsn: Lsn = parse_query_param(&request, "lsn")? .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let state = get_state(&request); @@ -1730,7 +1730,7 @@ async fn lsn_lease_handler( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; let result = timeline - .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &mut ctx) .map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?; json_response(StatusCode::OK, result) @@ -1747,8 +1747,8 @@ async fn timeline_gc_handler( let gc_req: TimelineGcRequest = json_request(&mut request).await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?; + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &mut ctx).await?; json_response(StatusCode::OK, gc_result) } @@ -1775,10 +1775,10 @@ async fn timeline_compact_handler( parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; timeline - .compact(&cancel, flags, &ctx) + .compact(&cancel, flags, &mut ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; if wait_until_uploaded { @@ -1812,7 +1812,7 @@ async fn timeline_checkpoint_handler( parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; timeline .freeze_and_flush() @@ -1825,7 +1825,7 @@ async fn timeline_checkpoint_handler( } })?; timeline - .compact(&cancel, flags, &ctx) + .compact(&cancel, flags, &mut ctx) .await .map_err(|e| match e { @@ -1918,8 +1918,8 @@ async fn timeline_detach_ancestor_handler( tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; - let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download); - let ctx = &ctx; + let mut ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download); + let ctx = &mut ctx; let timeline = tenant.get_timeline(timeline_id, true)?; @@ -2003,10 +2003,10 @@ async fn getpage_at_lsn_handler( .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?; async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; - let page = timeline.get(key.0, lsn, &ctx).await?; + let page = timeline.get(key.0, lsn, &mut ctx).await?; Result::<_, ApiError>::Ok( Response::builder() @@ -2032,11 +2032,11 @@ async fn timeline_collect_keyspace( let at_lsn: Option = parse_query_param(&request, "at_lsn")?; async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); let (dense_ks, sparse_ks) = timeline - .collect_keyspace(at_lsn, &ctx) + .collect_keyspace(at_lsn, &mut ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; @@ -2425,8 +2425,8 @@ async fn list_aux_files( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let files = timeline.list_aux_files(body.lsn, &ctx).await?; + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let files = timeline.list_aux_files(body.lsn, &mut ctx).await?; json_response(StatusCode::OK, files) } @@ -2467,15 +2467,15 @@ async fn ingest_aux_files( let mut modification = timeline.begin_modification( Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */ ); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); for (fname, content) in body.aux_files { modification - .put_file(&fname, content.as_bytes(), &ctx) + .put_file(&fname, content.as_bytes(), &mut ctx) .await .map_err(ApiError::InternalServerError)?; } modification - .commit(&ctx) + .commit(&mut ctx) .await .map_err(ApiError::InternalServerError)?; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 816d231127..ea7a8d3752 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1237,7 +1237,7 @@ impl SmgrQueryTimePerTimeline { &'a self, op: SmgrQueryType, ctx: &'c RequestContext, - ) -> impl Drop + '_ { + ) -> impl Drop + 'a { let metric = &self.metrics[op as usize]; let start = Instant::now(); match ctx.micros_spent_throttled.open() { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index eaaefbc00c..ccb3ee3f88 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -554,7 +554,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, protocol_version: PagestreamProtocolVersion, - ctx: RequestContext, + mut ctx: RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -624,7 +624,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::exists"); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); ( - self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &mut ctx) .instrument(span.clone()) .await, span, @@ -634,7 +634,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); ( - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &mut ctx) .instrument(span.clone()) .await, span, @@ -645,7 +645,7 @@ impl PageServerHandler { // shard_id is filled in by the handler let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); ( - self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &mut ctx) .instrument(span.clone()) .await, span, @@ -655,7 +655,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::dbsize"); let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); ( - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) + self.handle_db_size_request(tenant_id, timeline_id, &req, &mut ctx) .instrument(span.clone()) .await, span, @@ -665,7 +665,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); ( - self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) + self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &mut ctx) .instrument(span.clone()) .await, span, @@ -728,7 +728,7 @@ impl PageServerHandler { base_lsn: Lsn, _end_lsn: Lsn, pg_version: u32, - ctx: RequestContext, + mut ctx: RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -741,7 +741,7 @@ impl PageServerHandler { .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT) .await?; let timeline = tenant - .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) + .create_empty_timeline(timeline_id, base_lsn, pg_version, &mut ctx) .await?; // TODO mark timeline as not ready until it reaches end_lsn. @@ -766,7 +766,7 @@ impl PageServerHandler { &mut copyin_reader, base_lsn, self.broker_client.clone(), - &ctx, + &mut ctx, ) .await?; @@ -791,7 +791,7 @@ impl PageServerHandler { timeline_id: TimelineId, start_lsn: Lsn, end_lsn: Lsn, - ctx: RequestContext, + mut ctx: RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -814,7 +814,7 @@ impl PageServerHandler { pgb.write_message_noflush(&BeMessage::CopyInResponse)?; self.flush_cancellable(pgb, &timeline.cancel).await?; let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel))); - import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; + import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &mut ctx).await?; info!("wal import complete"); // Read the end of the tar archive. @@ -1534,7 +1534,7 @@ where fail::fail_point!("ps::connection-start::process-query"); - let ctx = self.connection_ctx.attached_child(); + let mut ctx = self.connection_ctx.attached_child(); debug!("process query {query_string:?}"); let parts = query_string.split_whitespace().collect::>(); if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) { @@ -1624,7 +1624,7 @@ where } }; - let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx); + let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&mut ctx); let res = async { self.handle_basebackup_request( pgb, @@ -1634,7 +1634,7 @@ where None, false, gzip, - &ctx, + &mut ctx, ) .await?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; @@ -1732,7 +1732,7 @@ where prev_lsn, true, false, - &ctx, + &mut ctx, ) .await?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; @@ -1860,7 +1860,7 @@ where .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?; match self - .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx) + .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &mut ctx) .await { Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,