mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 08:30:37 +00:00
revert back to 'span fixes' commit
This commit is contained in:
@@ -6,8 +6,6 @@ use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use nix::libc::SCTP_CURRENT_ASSOC;
|
||||
use nix::sys::wait;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{self, TenantState};
|
||||
use pageserver_api::models::{
|
||||
@@ -24,7 +22,6 @@ use postgres_backend::{
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use smallvec::SmallVec;
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::str;
|
||||
@@ -34,7 +31,6 @@ use std::time::SystemTime;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -754,20 +750,24 @@ impl PageServerHandler {
|
||||
Ok(Some(Box::new(batched_msg)))
|
||||
}
|
||||
|
||||
/// POst-condition: maybe_carry.is_some()
|
||||
/// Post-condition: `maybe_carry` is Some()
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
fn pagestream_do_batch(
|
||||
async fn pagestream_do_batch(
|
||||
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
|
||||
mut this_msg: Box<BatchedFeMessage>,
|
||||
this_msg: Box<BatchedFeMessage>,
|
||||
) -> Option<Box<BatchedFeMessage>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
match (maybe_carry.as_deref_mut(), *this_msg) {
|
||||
// nothing batched yet
|
||||
(None, this_msg) => {
|
||||
// new batch
|
||||
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
|
||||
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
|
||||
None
|
||||
}
|
||||
// nothing batched yet and this message is unbatchable
|
||||
(None, this_msg) => {
|
||||
Some(Box::new(this_msg)) // TODO: avoid this re-boxing
|
||||
}
|
||||
// something batched already, let's see if we can add this message to the batch
|
||||
(
|
||||
Some(BatchedFeMessage::GetPage {
|
||||
@@ -783,7 +783,7 @@ impl PageServerHandler {
|
||||
pages: this_pages,
|
||||
effective_request_lsn: this_lsn,
|
||||
},
|
||||
) if (|| {
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
|
||||
@@ -805,15 +805,19 @@ impl PageServerHandler {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})() =>
|
||||
}
|
||||
.await =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
None
|
||||
}
|
||||
// something batched already but this message is unbatchable
|
||||
(Some(_), this_msg_derefed) => {
|
||||
Some(Box::new(this_msg_derefed)) // TODO: avoid this re-boxing
|
||||
(Some(_), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
let this_msg = Box::new(this_msg); // TODO: avoid this box
|
||||
let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()");
|
||||
Some(carry)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1048,97 +1052,72 @@ impl PageServerHandler {
|
||||
.instrument(tracing::info_span!("read_protocol")),
|
||||
);
|
||||
|
||||
let current_batch: Arc<std::sync::Mutex<Option<Box<BatchedFeMessage>>>> =
|
||||
Default::default();
|
||||
let notify_data_ready = Arc::new(Notify::new());
|
||||
let notify_data_emptied = Arc::new(Notify::new());
|
||||
let mut batcher = tokio::spawn({
|
||||
let current_batch = current_batch.clone();
|
||||
let notify_data_ready = notify_data_ready.clone();
|
||||
let notify_data_emptied = notify_data_emptied.clone();
|
||||
let cancel = self.cancel.child_token();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("exiting");
|
||||
}
|
||||
loop {
|
||||
match requests_rx.recv().await {
|
||||
// NB: no need to select! for cancellation here because upstream checks
|
||||
Some(mut req) => {
|
||||
loop {
|
||||
let wait_emptied = {
|
||||
let mut guard = current_batch.lock().unwrap();
|
||||
match Self::pagestream_do_batch(&mut guard, req) {
|
||||
None => {
|
||||
// we successfully batched the message
|
||||
notify_data_ready.notify_one();
|
||||
break;
|
||||
}
|
||||
Some(unbatched_req) => {
|
||||
let emptied = notify_data_emptied.notified();
|
||||
let mut wait_for_emptied = Box::pin(emptied); // FIXME: noalloc
|
||||
if wait_for_emptied.as_mut().enable() {
|
||||
panic!("impossible, we're holding the lock");
|
||||
}
|
||||
drop(guard);
|
||||
req = unbatched_req; // assign back to retry in next iteration
|
||||
wait_for_emptied
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
() = wait_emptied => {
|
||||
debug!("notified that batch was emptied");
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
debug!("cancellation requested");
|
||||
break;
|
||||
}
|
||||
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
|
||||
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
|
||||
tokio::spawn(
|
||||
{
|
||||
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("exiting");
|
||||
}
|
||||
let mut batch: Option<Box<BatchedFeMessage>> = None;
|
||||
loop {
|
||||
let maybe_flush_msg = tokio::select! {
|
||||
req = requests_rx.recv() => {
|
||||
if let Some(req) = req {
|
||||
Self::pagestream_do_batch(&mut batch, req).await
|
||||
} else {
|
||||
debug!("request processing pipeline upstream dead");
|
||||
std::mem::take(&mut batch)
|
||||
}
|
||||
},
|
||||
() = ready_for_next_batch.notified() => {
|
||||
debug!("downstream ready, flushing batch early if any available");
|
||||
std::mem::take(&mut batch)
|
||||
}
|
||||
};
|
||||
let flush_msg = match maybe_flush_msg {
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
Some(flush_msg) => flush_msg,
|
||||
};
|
||||
match batched_tx.send(flush_msg).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
debug!("downstream is gone");
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
debug!("request processing pipeline upstream dead");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("batching"))
|
||||
});
|
||||
.instrument(tracing::info_span!("batching")),
|
||||
);
|
||||
|
||||
loop {
|
||||
let batched = loop {
|
||||
let notified = {
|
||||
let mut guard = current_batch.lock().unwrap();
|
||||
if let Some(batched) = guard.take() {
|
||||
notify_data_emptied.notify_one();
|
||||
drop(guard);
|
||||
break batched;
|
||||
} else {
|
||||
// cold path: nothing was batched
|
||||
let notified = notify_data_ready.notified();
|
||||
let mut notified = Box::pin(notified); // FIXME: no alloc please
|
||||
if notified.as_mut().enable() {
|
||||
panic!("impossible, we're holding the lock");
|
||||
let batch = match batched_rx.try_recv() {
|
||||
Ok(batch) => batch,
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
|
||||
ready_for_next_batch.notify_one();
|
||||
match batched_rx.recv().await {
|
||||
Some(batch) => batch,
|
||||
None => {
|
||||
debug!(
|
||||
"batched_rx observed as disconnected while waiting for next batch"
|
||||
);
|
||||
break;
|
||||
}
|
||||
drop(guard);
|
||||
notified
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
batcher_res = &mut batcher => {
|
||||
let _: () = batcher_res.context("batcher panicked")?;
|
||||
break;
|
||||
}
|
||||
_ = notified => {
|
||||
// we're notified that there's something to process
|
||||
// => next iteration will find it
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
|
||||
debug!("batched_rx observed as disconnected as part of try_recv()");
|
||||
break;
|
||||
}
|
||||
};
|
||||
debug!("processing batch");
|
||||
self.pagesteam_handle_batched_message(pgb, *batched, &ctx)
|
||||
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user