span fixes

This commit is contained in:
Christian Schwarz
2024-11-21 20:21:55 +01:00
parent 408bc8fc71
commit 73046fdf5b

View File

@@ -532,10 +532,12 @@ impl From<WaitLsnError> for QueryError {
enum BatchedFeMessage {
Exists {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
GetPage {
@@ -546,10 +548,12 @@ enum BatchedFeMessage {
},
DbSize {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
RespondError {
@@ -601,11 +605,12 @@ impl PageServerHandler {
async fn pagestream_read_message<IO>(
pgb: &mut PostgresBackendReader<IO>,
ref tenant_id: TenantId,
ref timeline_id: TimelineId,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_handles: &mut TimelineHandles,
cancel: &CancellationToken,
ctx: &RequestContext,
parent_span: Span,
) -> Result<Option<Box<BatchedFeMessage>>, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
@@ -644,22 +649,38 @@ impl PageServerHandler {
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists {
span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks {
span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize {
span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment {
span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Exists { span, shard, req }
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Nblocks { span, shard, req }
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::DbSize { span, shard, req }
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::GetSlruSegment { span, shard, req }
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
@@ -667,11 +688,7 @@ impl PageServerHandler {
blkno,
}) => {
// shard_id is filled in by the handler
let span = tracing::info_span!(
"handle_get_page_at_lsn_request_batched",
%tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn,
batch_size = tracing::field::Empty, batch_id = tracing::field::Empty
);
let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %request_lsn);
macro_rules! respond_error {
($error:expr) => {{
@@ -685,8 +702,8 @@ impl PageServerHandler {
let key = rel_block_to_key(rel, blkno);
let shard = match timeline_handles
.get(*tenant_id, *timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone()) // sets `shard_id` field
.await
{
Ok(tl) => tl,
@@ -808,8 +825,6 @@ impl PageServerHandler {
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
ctx: &RequestContext,
@@ -820,22 +835,22 @@ impl PageServerHandler {
// invoke handler function
let (handler_results, span): (Vec<Result<PagestreamBeMessage, PageStreamError>>, _) =
match batch {
BatchedFeMessage::Exists { span, req } => {
BatchedFeMessage::Exists { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
self.handle_get_rel_exists_request(&shard, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::Nblocks { span, req } => {
BatchedFeMessage::Nblocks { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
self.handle_get_nblocks_request(&shard, &req, &ctx)
.instrument(span.clone())
.await,
],
@@ -868,29 +883,24 @@ impl PageServerHandler {
span,
)
}
BatchedFeMessage::DbSize { span, req } => {
BatchedFeMessage::DbSize { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
self.handle_db_size_request(&shard, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetSlruSegment { span, req } => {
BatchedFeMessage::GetSlruSegment { span, shard, req } => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx,
)
.instrument(span.clone())
.await,
self.handle_get_slru_segment_request(&shard, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
@@ -1000,11 +1010,15 @@ impl PageServerHandler {
.expect("implementation error: timeline_handles should not be locked");
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let read_message_task = tokio::spawn(
{
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
@@ -1014,6 +1028,7 @@ impl PageServerHandler {
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
@@ -1034,7 +1049,7 @@ impl PageServerHandler {
anyhow::Ok((pgb_reader, timeline_handles))
}
}
.instrument(tracing::info_span!("reading")),
.instrument(tracing::info_span!("read_protocol")),
);
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
@@ -1043,6 +1058,9 @@ impl PageServerHandler {
{
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut batch: Option<Box<BatchedFeMessage>> = None;
loop {
let maybe_flush_msg = tokio::select! {
@@ -1099,7 +1117,7 @@ impl PageServerHandler {
}
};
debug!("processing batch");
self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx)
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
.await?;
}
@@ -1256,17 +1274,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &Timeline,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
@@ -1293,18 +1304,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_nblocks_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
@@ -1331,18 +1334,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_db_size_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
@@ -1397,18 +1392,10 @@ impl PageServerHandler {
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_slru_segment_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline: &Timeline,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self
.timeline_handles
.as_mut()
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);