sketch propagation through request context

This commit is contained in:
Christian Schwarz
2025-01-22 19:03:16 +01:00
parent a6660a2883
commit 4b2a91cb5a
2 changed files with 66 additions and 204 deletions

View File

@@ -92,203 +92,56 @@
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
latency_recording: Option<latency_recording::LatencyRecording>,
}
/// The kind of access to the page cache.
#[derive(Clone, Copy, PartialEq, Eq, Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
pub enum PageContentKind {
Unknown,
DeltaLayerSummary,
DeltaLayerBtreeNode,
DeltaLayerValue,
ImageLayerSummary,
ImageLayerBtreeNode,
ImageLayerValue,
InMemoryLayer,
}
/// Desired behavior if the operation requires an on-demand download
/// to proceed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum DownloadBehavior {
/// Download the layer file. It can take a while.
Download,
/// Download the layer file, but print a warning to the log. This should be used
/// in code where the layer file is expected to already exist locally.
Warn,
/// Return a PageReconstructError::NeedsDownload error
Error,
}
/// Whether this request should update access times used in LRU eviction
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) enum AccessStatsBehavior {
/// Update access times: this request's access to data should be taken
/// as a hint that the accessed layer is likely to be accessed again
Update,
/// Do not update access times: this request is accessing the layer
/// but does not want to indicate that the layer should be retained in cache,
/// perhaps because the requestor is a compaction routine that will soon cover
/// this layer with another.
Skip,
}
pub struct RequestContextBuilder {
inner: RequestContext,
}
impl RequestContextBuilder {
/// A new builder with default settings
pub fn new(task_kind: TaskKind) -> Self {
Self {
inner: RequestContext {
task_kind,
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
},
}
}
pub fn extend(original: &RequestContext) -> Self {
Self {
// This is like a Copy, but avoid implementing Copy because ordinary users of
// RequestContext should always move or ref it.
inner: RequestContext {
task_kind: original.task_kind,
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
},
}
}
/// Configure the DownloadBehavior of the context: whether to
/// download missing layers, and/or warn on the download.
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
self.inner.download_behavior = b;
self
}
/// Configure the AccessStatsBehavior of the context: whether layer
/// accesses should update the access time of the layer.
pub(crate) fn access_stats_behavior(mut self, b: AccessStatsBehavior) -> Self {
self.inner.access_stats_behavior = b;
self
}
pub(crate) fn page_content_kind(mut self, k: PageContentKind) -> Self {
self.inner.page_content_kind = k;
self
}
pub fn build(self) -> RequestContext {
self.inner
}
trait Propagatable: Default {
fn propagate(&self, child: &mut Self);
}
impl RequestContext {
/// Create a new RequestContext that has no parent.
///
/// The function is called `new` because, once we add children
/// to it using `detached_child` or `attached_child`, the context
/// form a tree (not implemented yet since cancellation will be
/// the first feature that requires a tree).
///
/// # Future: Cancellation
///
/// The only reason why a context like this one can be canceled is
/// because someone explicitly canceled it.
/// It has no parent, so it cannot inherit cancellation from there.
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
RequestContextBuilder::new(task_kind)
.download_behavior(download_behavior)
.build()
fn root() -> Self {
Self::default()
}
/// Create a detached child context for a task that may outlive `self`.
///
/// Use this when spawning new background activity that should complete
/// even if the current request is canceled.
///
/// # Future: Cancellation
///
/// Cancellation of `self` will not propagate to the child context returned
/// by this method.
///
/// # Future: Structured Concurrency
///
/// We could add the Future as a parameter to this function, spawn it as a task,
/// and pass to the new task the child context as an argument.
/// That would be an ergonomic improvement.
///
/// We could make new calls to this function fail if `self` is already canceled.
pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
self.child_impl(task_kind, download_behavior)
}
/// Create a child of context `self` for a task that shall not outlive `self`.
///
/// Use this when fanning-out work to other async tasks.
///
/// # Future: Cancellation
///
/// Cancelling a context will propagate to its attached children.
///
/// # Future: Structured Concurrency
///
/// We could add the Future as a parameter to this function, spawn it as a task,
/// and track its `JoinHandle` inside the `RequestContext`.
///
/// We could then provide another method to allow waiting for all child tasks
/// to finish.
///
/// We could make new calls to this function fail if `self` is already canceled.
/// Alternatively, we could allow the creation but not spawn the task.
/// The method to wait for child tasks would return an error, indicating
/// that the child task was not started because the context was canceled.
pub fn attached_child(&self) -> Self {
self.child_impl(self.task_kind(), self.download_behavior())
}
/// Use this function when you should be creating a child context using
/// [`attached_child`] or [`detached_child`], but your caller doesn't provide
/// a context and you are unwilling to change all callers to provide one.
///
/// Before we add cancellation, we should get rid of this method.
///
/// [`attached_child`]: Self::attached_child
/// [`detached_child`]: Self::detached_child
pub fn todo_child(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
}
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
}
pub fn task_kind(&self) -> TaskKind {
self.task_kind
}
pub fn download_behavior(&self) -> DownloadBehavior {
self.download_behavior
}
pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior {
self.access_stats_behavior
}
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
fn child(&self) -> RequestContext {
let mut child = RequestContext::default();
let Self {
latency_recording,
} = self;
if let Some(latency_recording) = latency_recording {
child.latency_recording = Some(latency_recording.child());
}
}
}
mod latency_recording {
struct LatencyRecording {
inner: Arc<Mutex<Inner>>,
}
impl LatencyRecording {
fn new() -> Self {
Self {
current: Mutex::new(HashMap::new()),
}
}
fn on_request_recv(&self, now: Instant) -> {
let mut inner = self.inner.lock().unwrap();
inner.current.insert(now, now);
}
}
impl Propagatable for LatencyRecording {
fn propagate(&self, other: &Self) {
let mut inner = self.inner.lock().unwrap();
let other_inner = other.inner.lock().unwrap();
for (k, v) in other_inner.current.iter() {
inner.current.insert(*k, *v);
}
}
}
}

View File

@@ -728,10 +728,11 @@ impl PageServerHandler {
msg = pgb.read_message() => { msg }
};
let mut recorder = SmgrOpLatencyRecorder {
start_time: Instant::now(),
...
};
let ctx = ctx.enrich(
SmgrOpLatencyRecorder {
start_time: Instant::now(),
parse_request: ParseRequest::unset(),
});
let received_at = Instant::now();
@@ -753,17 +754,15 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message");
let parse_request_recorder = ParseREquest {
start_time: Instant::now(),
...
};
{
let parse_request_recorder= ctx.get::<SmgrOpLatencyRecorder>().parse_request;
// parse request
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version, &parse_request_recorder)?;
}
parse_request_recorder.end_time = Instant::now();
recorder.parse_request = parse_request_recorder;
// TODO: turn in to async closure once available to avoid repeating received_at
async fn record_op_start_and_throttle(
@@ -931,7 +930,11 @@ impl PageServerHandler {
shard: shard.downgrade(),
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
recorder,
downstairs: {
let downstairs = Arc::new(Mutex::new(Downstairs::init()));
*(&mut ctx.get::<SmgrOpLatencyRecorder>().downstairs) = Arc::clone(&downstairs);
downstairs
},
}
}
#[cfg(feature = "testing")]
@@ -1606,6 +1609,10 @@ impl PageServerHandler {
async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let ctx = ctx.attached_child();
let child = ctx.attached_child();
let maybe_batch = batch_rx.recv().await;
let batch = match maybe_batch {
Ok(batch) => batch,
@@ -1620,6 +1627,8 @@ impl PageServerHandler {
return Err(e);
}
};
batch.downstairs.wait_for_execution.end_time = Instant::now();
ctx.latency_recording.enrich(batch.downstairs);
self.pagesteam_handle_batched_message(
pgb_writer,
batch,