mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
some easy mechanical fixes (add let mut and &mut ctx)
This commit is contained in:
@@ -502,7 +502,7 @@ async fn timeline_create_handler(
|
||||
|
||||
let new_timeline_id = request_data.new_timeline_id;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
@@ -527,7 +527,7 @@ async fn timeline_create_handler(
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
|
||||
request_data.existing_initdb_timeline_id,
|
||||
state.broker_client.clone(),
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -535,7 +535,7 @@ async fn timeline_create_handler(
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(
|
||||
&new_timeline,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
tenant::timeline::GetLogicalSizePriority::User,
|
||||
)
|
||||
.await
|
||||
@@ -593,7 +593,7 @@ async fn timeline_list_handler(
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let response_data = async {
|
||||
let tenant = state
|
||||
@@ -610,7 +610,7 @@ async fn timeline_list_handler(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
||||
.await
|
||||
@@ -680,7 +680,7 @@ async fn timeline_detail_handler(
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
// Logical size calculation needs downloading.
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline_info = async {
|
||||
@@ -696,7 +696,7 @@ async fn timeline_detail_handler(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await
|
||||
.context("get local timeline info")
|
||||
@@ -735,13 +735,13 @@ async fn get_lsn_by_timestamp_handler(
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
|
||||
.find_lsn_for_timestamp(timestamp_pg, &cancel, &mut ctx)
|
||||
.await?;
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
struct Result {
|
||||
@@ -786,11 +786,11 @@ async fn get_timestamp_of_lsn_handler(
|
||||
.with_context(|| format!("Invalid LSN: {lsn_str:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
|
||||
let result = timeline.get_timestamp_for_lsn(lsn, &mut ctx).await?;
|
||||
|
||||
match result {
|
||||
Some(time) => {
|
||||
@@ -816,7 +816,7 @@ async fn tenant_attach_handler(
|
||||
None => TenantConfOpt::default(),
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
info!("Handling tenant attach {tenant_id}");
|
||||
|
||||
@@ -830,7 +830,7 @@ async fn tenant_attach_handler(
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &ctx)
|
||||
.upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &mut ctx)
|
||||
.await?;
|
||||
|
||||
let Some(tenant) = tenant else {
|
||||
@@ -921,11 +921,11 @@ async fn tenant_reset_handler(
|
||||
|
||||
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &mut ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -939,7 +939,7 @@ async fn tenant_load_handler(
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
|
||||
|
||||
@@ -956,7 +956,7 @@ async fn tenant_load_handler(
|
||||
state.broker_client.clone(),
|
||||
state.remote_storage.clone(),
|
||||
state.deletion_queue_client.clone(),
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.instrument(info_span!("load", %tenant_id))
|
||||
.await?;
|
||||
@@ -1120,7 +1120,7 @@ async fn tenant_size_handler(
|
||||
)));
|
||||
}
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
@@ -1132,7 +1132,7 @@ async fn tenant_size_handler(
|
||||
retention_period,
|
||||
LogicalSizeCalculationCause::TenantSizeHandler,
|
||||
&cancel,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -1193,7 +1193,7 @@ async fn tenant_shard_split_handler(
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let state = get_state(&request);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
@@ -1206,7 +1206,7 @@ async fn tenant_shard_split_handler(
|
||||
tenant,
|
||||
ShardCount::new(req.new_shard_count),
|
||||
req.new_stripe_size,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
@@ -1386,7 +1386,7 @@ async fn tenant_create_handler(
|
||||
|
||||
let generation = get_request_generation(state, request_data.generation)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let location_conf =
|
||||
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
|
||||
@@ -1398,7 +1398,7 @@ async fn tenant_create_handler(
|
||||
location_conf,
|
||||
None,
|
||||
SpawnMode::Create,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1498,7 +1498,7 @@ async fn put_tenant_location_config_handler(
|
||||
let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false);
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
|
||||
@@ -1537,7 +1537,7 @@ async fn put_tenant_location_config_handler(
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx)
|
||||
.upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &mut ctx)
|
||||
.await?;
|
||||
let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size());
|
||||
let attached = tenant.is_some();
|
||||
@@ -1722,7 +1722,7 @@ async fn lsn_lease_handler(
|
||||
let lsn: Lsn = parse_query_param(&request, "lsn")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
@@ -1730,7 +1730,7 @@ async fn lsn_lease_handler(
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let result = timeline
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &mut ctx)
|
||||
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
|
||||
|
||||
json_response(StatusCode::OK, result)
|
||||
@@ -1747,8 +1747,8 @@ async fn timeline_gc_handler(
|
||||
|
||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &mut ctx).await?;
|
||||
|
||||
json_response(StatusCode::OK, gc_result)
|
||||
}
|
||||
@@ -1775,10 +1775,10 @@ async fn timeline_compact_handler(
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
timeline
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.compact(&cancel, flags, &mut ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
if wait_until_uploaded {
|
||||
@@ -1812,7 +1812,7 @@ async fn timeline_checkpoint_handler(
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
@@ -1825,7 +1825,7 @@ async fn timeline_checkpoint_handler(
|
||||
}
|
||||
})?;
|
||||
timeline
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.compact(&cancel, flags, &mut ctx)
|
||||
.await
|
||||
.map_err(|e|
|
||||
match e {
|
||||
@@ -1918,8 +1918,8 @@ async fn timeline_detach_ancestor_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
|
||||
let ctx = &ctx;
|
||||
let mut ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
|
||||
let ctx = &mut ctx;
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
@@ -2003,10 +2003,10 @@ async fn getpage_at_lsn_handler(
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
|
||||
let page = timeline.get(key.0, lsn, &ctx).await?;
|
||||
let page = timeline.get(key.0, lsn, &mut ctx).await?;
|
||||
|
||||
Result::<_, ApiError>::Ok(
|
||||
Response::builder()
|
||||
@@ -2032,11 +2032,11 @@ async fn timeline_collect_keyspace(
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
let (dense_ks, sparse_ks) = timeline
|
||||
.collect_keyspace(at_lsn, &ctx)
|
||||
.collect_keyspace(at_lsn, &mut ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
@@ -2425,8 +2425,8 @@ async fn list_aux_files(
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline.list_aux_files(body.lsn, &mut ctx).await?;
|
||||
json_response(StatusCode::OK, files)
|
||||
}
|
||||
|
||||
@@ -2467,15 +2467,15 @@ async fn ingest_aux_files(
|
||||
let mut modification = timeline.begin_modification(
|
||||
Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
|
||||
);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let mut ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
for (fname, content) in body.aux_files {
|
||||
modification
|
||||
.put_file(&fname, content.as_bytes(), &ctx)
|
||||
.put_file(&fname, content.as_bytes(), &mut ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
modification
|
||||
.commit(&ctx)
|
||||
.commit(&mut ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
|
||||
@@ -1237,7 +1237,7 @@ impl SmgrQueryTimePerTimeline {
|
||||
&'a self,
|
||||
op: SmgrQueryType,
|
||||
ctx: &'c RequestContext,
|
||||
) -> impl Drop + '_ {
|
||||
) -> impl Drop + 'a {
|
||||
let metric = &self.metrics[op as usize];
|
||||
let start = Instant::now();
|
||||
match ctx.micros_spent_throttled.open() {
|
||||
|
||||
@@ -554,7 +554,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
ctx: RequestContext,
|
||||
mut ctx: RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -624,7 +624,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &mut ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -634,7 +634,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &mut ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -645,7 +645,7 @@ impl PageServerHandler {
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &mut ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -655,7 +655,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &mut ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -665,7 +665,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &mut ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -728,7 +728,7 @@ impl PageServerHandler {
|
||||
base_lsn: Lsn,
|
||||
_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: RequestContext,
|
||||
mut ctx: RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -741,7 +741,7 @@ impl PageServerHandler {
|
||||
.get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
|
||||
.await?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &mut ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
@@ -766,7 +766,7 @@ impl PageServerHandler {
|
||||
&mut copyin_reader,
|
||||
base_lsn,
|
||||
self.broker_client.clone(),
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -791,7 +791,7 @@ impl PageServerHandler {
|
||||
timeline_id: TimelineId,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
ctx: RequestContext,
|
||||
mut ctx: RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -814,7 +814,7 @@ impl PageServerHandler {
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &mut ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
// Read the end of the tar archive.
|
||||
@@ -1534,7 +1534,7 @@ where
|
||||
|
||||
fail::fail_point!("ps::connection-start::process-query");
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
let mut ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string:?}");
|
||||
let parts = query_string.split_whitespace().collect::<Vec<_>>();
|
||||
if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
|
||||
@@ -1624,7 +1624,7 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&mut ctx);
|
||||
let res = async {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
@@ -1634,7 +1634,7 @@ where
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
@@ -1732,7 +1732,7 @@ where
|
||||
prev_lsn,
|
||||
true,
|
||||
false,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
@@ -1860,7 +1860,7 @@ where
|
||||
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
|
||||
|
||||
match self
|
||||
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
|
||||
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &mut ctx)
|
||||
.await
|
||||
{
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
|
||||
Reference in New Issue
Block a user