mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
undo the handle stuff
This commit is contained in:
@@ -121,10 +121,6 @@ pub(crate) enum Scope {
|
||||
Timeline {
|
||||
timeline: Arc<Timeline>,
|
||||
},
|
||||
TimelineHandle {
|
||||
timeline_handle:
|
||||
crate::tenant::timeline::handle::Handle<crate::page_service::TenantManagerTypes>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Scope {
|
||||
@@ -145,20 +141,13 @@ impl Scope {
|
||||
timeline: Arc::clone(timeline),
|
||||
}
|
||||
}
|
||||
pub(crate) fn new_timeline_handle(
|
||||
timeline_handle: crate::tenant::timeline::handle::Handle<
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
Scope::TimelineHandle { timeline_handle }
|
||||
}
|
||||
|
||||
|
||||
pub(crate) fn io_size_metrics(&self) -> &crate::metrics::StorageIoSizeMetrics {
|
||||
match self {
|
||||
Scope::Global { io_size_metrics } => io_size_metrics,
|
||||
Scope::Tenant { tenant } => &tenant.virtual_file_io_metrics,
|
||||
Scope::Timeline { timeline } => &timeline.metrics.storage_io_size,
|
||||
Scope::TimelineHandle { timeline_handle } => &timeline_handle.metrics.storage_io_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1086,16 +1086,6 @@ impl PageServerHandler {
|
||||
batch
|
||||
};
|
||||
|
||||
macro_rules! upgrade_handle_and_set_context {
|
||||
($shard:expr) => {{
|
||||
let shard = $shard.upgrade()?;
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.scope(context::Scope::new_timeline_handle(shard.clone()))
|
||||
.build();
|
||||
(shard, ctx)
|
||||
}};
|
||||
}
|
||||
|
||||
// invoke handler function
|
||||
let (mut handler_results, span): (
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
@@ -1108,10 +1098,9 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_rel_exists_request(&*shard, &req, &ctx)
|
||||
.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1126,10 +1115,9 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_nblocks_request(&*shard, &req, &ctx)
|
||||
.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1144,18 +1132,17 @@ impl PageServerHandler {
|
||||
pages,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&*shard,
|
||||
&*shard.upgrade()?,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
io_concurrency,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
@@ -1172,10 +1159,9 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_db_size_request(&*shard, &req, &ctx)
|
||||
.handle_db_size_request(&*shard.upgrade()?, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1190,10 +1176,9 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_slru_segment_request(&*shard, &req, &ctx)
|
||||
.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1208,13 +1193,12 @@ impl PageServerHandler {
|
||||
requests,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::test");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
{
|
||||
let npages = requests.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_test_request_batch(&*shard, requests, ctx)
|
||||
.handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
@@ -1647,7 +1631,7 @@ impl PageServerHandler {
|
||||
//
|
||||
|
||||
let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
|
||||
let mut ctx = ctx.attached_child();
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let _cancel_batcher = cancel_batcher.drop_guard();
|
||||
loop {
|
||||
@@ -1674,7 +1658,7 @@ impl PageServerHandler {
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&mut ctx,
|
||||
&ctx,
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user