This commit is contained in:
Christian Schwarz
2024-11-21 21:34:58 +01:00
parent 56de07154e
commit 7680aa12a8

View File

@@ -6,6 +6,7 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use nix::libc::SCTP_CURRENT_ASSOC;
use once_cell::sync::OnceCell;
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
@@ -32,6 +33,7 @@ use std::time::SystemTime;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -751,83 +753,72 @@ impl PageServerHandler {
Ok(Some(Box::new(batched_msg)))
}
/// Post-condition: `maybe_carry` is Some()
/// POst-condition: maybe_carry.is_some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn pagestream_do_batch(
fn pagestream_do_batch(
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
mut this_msg: Box<BatchedFeMessage>,
) -> smallvec::SmallVec<[Box<BatchedFeMessage>; 2]> {
) -> Option<Box<BatchedFeMessage>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let mut ret = SmallVec::new();
loop {
match (maybe_carry.as_deref_mut(), *this_msg) {
// new batch
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
break;
match (maybe_carry.as_deref_mut(), *this_msg) {
// new batch
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
None
}
// nothing batched yet and this message is unbatchable
(None, this_msg) => {
Some(Box::new(this_msg)); // TODO: avoid this re-boxing
}
// something batched already, let's see if we can add this message to the batch
(
Some(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}),
// would be nice to have box pattern here
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
// nothing batched yet and this message is unbatchable
(None, this_msg) => {
ret.push(Box::new(this_msg)); // TODO: avoid this re-boxing
break;
}
// something batched already, let's see if we can add this message to the batch
(
Some(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}),
// would be nice to have box pattern here
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
true
}
.await =>
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// ok to batch
accum_pages.extend(this_pages);
break;
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
// something batched already but this message is unbatchable
(Some(_), this_msg_derefed) => {
let carry = maybe_carry.take().expect("this match arm checks it's Some()");
ret.push(carry);
this_msg = Box::new(this_msg_derefed); // TODO: avoid this re-boxing
// next iteration will decide what to do
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
true
})() =>
{
// ok to batch
accum_pages.extend(this_pages);
None
}
// something batched already but this message is unbatchable
(Some(_), this_msg_derefed) => {
Some(Box::new(*this_msg_derefed)) // TODO: avoid this re-boxing
}
}
assert!(!ret.spilled());
ret
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
@@ -1060,68 +1051,84 @@ impl PageServerHandler {
.instrument(tracing::info_span!("read_protocol")),
);
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(
{
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut batch: Option<Box<BatchedFeMessage>> = None;
loop {
let flush_msgs: smallvec::SmallVec<[_; 2]> = tokio::select! {
req = requests_rx.recv() => {
if let Some(req) = req {
Self::pagestream_do_batch(&mut batch, req).await
} else {
debug!("request processing pipeline upstream dead");
std::mem::take(&mut batch).into_iter().collect()
let current_batch: Arc<std::sync::Mutex<Option<Box<BatchedFeMessage>>>> =
Default::default();
let notify_data_ready = Arc::new(Notify::new());
let notify_data_emptied = Arc::new(Notify::new());
let batcher = tokio::spawn({
let current_batch = current_batch.clone();
let notify_data_ready = notify_data_ready.clone();
let notify_data_emptied = notify_data_emptied.clone();
async move {
scopeguard::defer! {
debug!("exiting");
}
loop {
match requests_rx.recv().await {
Some(req) => {
let mut guard = current_batch.lock().unwrap();
match Self::pagestream_do_batch(&mut guard, req) {
Some(req) => {
let emptied = notify_data_emptied.notified();
let emptied = std::pin::pin!(emptied);
if emptied.enable() {
panic!("impossible, we're holding the lock");
}
drop(guard);
emptied.await;
let guard = current_batch.lock().unwrap();
let prev = guard.replace(req);
assert!(
prev.is_none(),
"we were notified that data emptied but there was data"
);
}
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early if any available");
std::mem::take(&mut batch).into_iter().collect()
}
};
for msg in flush_msgs {
match batched_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
None => {
// we successfully batched the message
notify_data_ready.notify_one();
}
}
}
None => {
debug!("request processing pipeline upstream dead");
return;
}
}
}
}
.instrument(tracing::info_span!("batching")),
);
.instrument(tracing::info_span!("batching"))
});
loop {
let batch = match batched_rx.try_recv() {
Ok(batch) => batch,
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
ready_for_next_batch.notify_one();
match batched_rx.recv().await {
Some(batch) => batch,
None => {
debug!(
"batched_rx observed as disconnected while waiting for next batch"
);
break;
}
let batched = {
// hot path: take whatever got batched since last await
let guard = current_batch.lock().unwrap();
if let Some(batched) = guard.take() {
notify_data_emptied.notify_one();
drop(guard);
batched
} else {
// cold path: nothing was batched
let notified = notify_data_ready.notified();
let notified = std::pin::pin!(notified);
if notified.enable() {
panic!("impossible, we're holding the lock");
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
debug!("batched_rx observed as disconnected as part of try_recv()");
break;
drop(guard);
tokio::select! {
batcher_res = batcher => {
let _: () = batcher_res.context("batcher panicked")?;
}
_ = notified => {
// we're notified that there's something to process
}
};
let mut guard = current_batch.lock().unwrap();
guard.take().unwrap()
}
};
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
self.pagesteam_handle_batched_message(pgb, *batched, &ctx)
.await?;
}