mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
fruitless debugging
This commit is contained in:
@@ -22,6 +22,7 @@ use postgres_backend::{
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use smallvec::SmallVec;
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::str;
|
||||
@@ -754,72 +755,79 @@ impl PageServerHandler {
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
async fn pagestream_do_batch(
|
||||
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
|
||||
this_msg: Box<BatchedFeMessage>,
|
||||
) -> Option<Box<BatchedFeMessage>> {
|
||||
mut this_msg: Box<BatchedFeMessage>,
|
||||
) -> smallvec::SmallVec<[Box<BatchedFeMessage>; 2]> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
match (maybe_carry.as_deref_mut(), *this_msg) {
|
||||
// new batch
|
||||
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
|
||||
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
|
||||
None
|
||||
}
|
||||
// nothing batched yet and this message is unbatchable
|
||||
(None, this_msg) => {
|
||||
Some(Box::new(this_msg)) // TODO: avoid this re-boxing
|
||||
}
|
||||
// something batched already, let's see if we can add this message to the batch
|
||||
(
|
||||
Some(BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: ref mut accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
// would be nice to have box pattern here
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
},
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
let mut ret = SmallVec::new();
|
||||
|
||||
loop {
|
||||
match (maybe_carry.as_deref_mut(), *this_msg) {
|
||||
// new batch
|
||||
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
|
||||
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
|
||||
break;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
// nothing batched yet and this message is unbatchable
|
||||
(None, this_msg) => {
|
||||
ret.push(Box::new(this_msg)); // TODO: avoid this re-boxing
|
||||
break;
|
||||
}
|
||||
// something batched already, let's see if we can add this message to the batch
|
||||
(
|
||||
Some(BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: ref mut accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
// would be nice to have box pattern here
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
},
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
if *accum_lsn != this_lsn {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
.await =>
|
||||
{
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
break;
|
||||
}
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
if *accum_lsn != this_lsn {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
|
||||
return false;
|
||||
// something batched already but this message is unbatchable
|
||||
(Some(_), this_msg_derefed) => {
|
||||
let carry = maybe_carry.take().expect("this match arm checks it's Some()");
|
||||
ret.push(carry);
|
||||
this_msg = Box::new(this_msg_derefed); // TODO: avoid this re-boxing
|
||||
// next iteration will decide what to do
|
||||
}
|
||||
true
|
||||
}
|
||||
.await =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
None
|
||||
}
|
||||
// something batched already but this message is unbatchable
|
||||
(Some(_), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
let this_msg = Box::new(this_msg); // TODO: avoid this box
|
||||
let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()");
|
||||
Some(carry)
|
||||
}
|
||||
}
|
||||
assert!(!ret.spilled());
|
||||
ret
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
@@ -1063,31 +1071,27 @@ impl PageServerHandler {
|
||||
}
|
||||
let mut batch: Option<Box<BatchedFeMessage>> = None;
|
||||
loop {
|
||||
let maybe_flush_msg = tokio::select! {
|
||||
let flush_msgs: smallvec::SmallVec<[_; 2]> = tokio::select! {
|
||||
req = requests_rx.recv() => {
|
||||
if let Some(req) = req {
|
||||
Self::pagestream_do_batch(&mut batch, req).await
|
||||
} else {
|
||||
debug!("request processing pipeline upstream dead");
|
||||
std::mem::take(&mut batch)
|
||||
std::mem::take(&mut batch).into_iter().collect()
|
||||
}
|
||||
},
|
||||
() = ready_for_next_batch.notified() => {
|
||||
debug!("downstream ready, flushing batch early if any available");
|
||||
std::mem::take(&mut batch)
|
||||
std::mem::take(&mut batch).into_iter().collect()
|
||||
}
|
||||
};
|
||||
let flush_msg = match maybe_flush_msg {
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
Some(flush_msg) => flush_msg,
|
||||
};
|
||||
match batched_tx.send(flush_msg).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
debug!("downstream is gone");
|
||||
break;
|
||||
for msg in flush_msgs {
|
||||
match batched_tx.send(msg).await {
|
||||
Ok(()) => {}
|
||||
Err(tokio::sync::mpsc::error::SendError(_)) => {
|
||||
debug!("downstream is gone");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user