improvements

This commit is contained in:
Christian Schwarz
2024-11-21 21:57:53 +01:00
parent 7680aa12a8
commit 240e48df59

View File

@@ -7,6 +7,7 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use nix::libc::SCTP_CURRENT_ASSOC;
use nix::sys::wait;
use once_cell::sync::OnceCell;
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
@@ -762,15 +763,11 @@ impl PageServerHandler {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
match (maybe_carry.as_deref_mut(), *this_msg) {
// new batch
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
// nothing batched yet
(None, this_msg) => {
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
None
}
// nothing batched yet and this message is unbatchable
(None, this_msg) => {
Some(Box::new(this_msg)); // TODO: avoid this re-boxing
}
// something batched already, let's see if we can add this message to the batch
(
Some(BatchedFeMessage::GetPage {
@@ -816,7 +813,7 @@ impl PageServerHandler {
}
// something batched already but this message is unbatchable
(Some(_), this_msg_derefed) => {
Some(Box::new(*this_msg_derefed)) // TODO: avoid this re-boxing
Some(Box::new(this_msg_derefed)) // TODO: avoid this re-boxing
}
}
}
@@ -1055,37 +1052,48 @@ impl PageServerHandler {
Default::default();
let notify_data_ready = Arc::new(Notify::new());
let notify_data_emptied = Arc::new(Notify::new());
let batcher = tokio::spawn({
let mut batcher = tokio::spawn({
let current_batch = current_batch.clone();
let notify_data_ready = notify_data_ready.clone();
let notify_data_emptied = notify_data_emptied.clone();
let cancel = self.cancel.child_token();
async move {
scopeguard::defer! {
debug!("exiting");
}
loop {
match requests_rx.recv().await {
Some(req) => {
let mut guard = current_batch.lock().unwrap();
match Self::pagestream_do_batch(&mut guard, req) {
Some(req) => {
let emptied = notify_data_emptied.notified();
let emptied = std::pin::pin!(emptied);
if emptied.enable() {
panic!("impossible, we're holding the lock");
// NB: no need to select! for cancellation here because upstream checks
Some(mut req) => {
loop {
let wait_emptied = {
let mut guard = current_batch.lock().unwrap();
match Self::pagestream_do_batch(&mut guard, req) {
None => {
// we successfully batched the message
notify_data_ready.notify_one();
break;
}
Some(unbatched_req) => {
let emptied = notify_data_emptied.notified();
let mut wait_for_emptied = Box::pin(emptied); // FIXME: noalloc
if wait_for_emptied.as_mut().enable() {
panic!("impossible, we're holding the lock");
}
drop(guard);
req = unbatched_req; // assign back to retry in next iteration
wait_for_emptied
}
}
};
tokio::select! {
() = wait_emptied => {
debug!("notified that batch was emptied");
}
_ = cancel.cancelled() => {
debug!("cancellation requested");
break;
}
drop(guard);
emptied.await;
let guard = current_batch.lock().unwrap();
let prev = guard.replace(req);
assert!(
prev.is_none(),
"we were notified that data emptied but there was data"
);
}
None => {
// we successfully batched the message
notify_data_ready.notify_one();
}
}
}
@@ -1100,32 +1108,34 @@ impl PageServerHandler {
});
loop {
let batched = {
// hot path: take whatever got batched since last await
let guard = current_batch.lock().unwrap();
if let Some(batched) = guard.take() {
notify_data_emptied.notify_one();
drop(guard);
batched
} else {
// cold path: nothing was batched
let notified = notify_data_ready.notified();
let notified = std::pin::pin!(notified);
if notified.enable() {
panic!("impossible, we're holding the lock");
}
drop(guard);
tokio::select! {
batcher_res = batcher => {
let _: () = batcher_res.context("batcher panicked")?;
}
_ = notified => {
// we're notified that there's something to process
}
};
let batched = loop {
let notified = {
let mut guard = current_batch.lock().unwrap();
guard.take().unwrap()
}
if let Some(batched) = guard.take() {
notify_data_emptied.notify_one();
drop(guard);
break batched;
} else {
// cold path: nothing was batched
let notified = notify_data_ready.notified();
let mut notified = Box::pin(notified); // FIXME: no alloc please
if notified.as_mut().enable() {
panic!("impossible, we're holding the lock");
}
drop(guard);
notified
}
};
tokio::select! {
batcher_res = &mut batcher => {
let _: () = batcher_res.context("batcher panicked")?;
break;
}
_ = notified => {
// we're notified that there's something to process
// => next iteration will find it
}
};
};
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batched, &ctx)