From 4b2a91cb5a412b3bb90c5d4a5e8ae21ac60538ea Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 22 Jan 2025 19:03:16 +0100 Subject: [PATCH] sketch propagation through request context --- pageserver/src/context.rs | 235 ++++++--------------------------- pageserver/src/page_service.rs | 35 +++-- 2 files changed, 66 insertions(+), 204 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 8f2177fe5b..33b4c98cd4 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -92,203 +92,56 @@ use crate::task_mgr::TaskKind; // The main structure of this module, see module-level comment. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct RequestContext { - task_kind: TaskKind, - download_behavior: DownloadBehavior, - access_stats_behavior: AccessStatsBehavior, - page_content_kind: PageContentKind, + latency_recording: Option, } -/// The kind of access to the page cache. -#[derive(Clone, Copy, PartialEq, Eq, Debug, enum_map::Enum, strum_macros::IntoStaticStr)] -pub enum PageContentKind { - Unknown, - DeltaLayerSummary, - DeltaLayerBtreeNode, - DeltaLayerValue, - ImageLayerSummary, - ImageLayerBtreeNode, - ImageLayerValue, - InMemoryLayer, -} - -/// Desired behavior if the operation requires an on-demand download -/// to proceed. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub enum DownloadBehavior { - /// Download the layer file. It can take a while. - Download, - - /// Download the layer file, but print a warning to the log. This should be used - /// in code where the layer file is expected to already exist locally. - Warn, - - /// Return a PageReconstructError::NeedsDownload error - Error, -} - -/// Whether this request should update access times used in LRU eviction -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub(crate) enum AccessStatsBehavior { - /// Update access times: this request's access to data should be taken - /// as a hint that the accessed layer is likely to be accessed again - Update, - - /// Do not update access times: this request is accessing the layer - /// but does not want to indicate that the layer should be retained in cache, - /// perhaps because the requestor is a compaction routine that will soon cover - /// this layer with another. - Skip, -} - -pub struct RequestContextBuilder { - inner: RequestContext, -} - -impl RequestContextBuilder { - /// A new builder with default settings - pub fn new(task_kind: TaskKind) -> Self { - Self { - inner: RequestContext { - task_kind, - download_behavior: DownloadBehavior::Download, - access_stats_behavior: AccessStatsBehavior::Update, - page_content_kind: PageContentKind::Unknown, - }, - } - } - - pub fn extend(original: &RequestContext) -> Self { - Self { - // This is like a Copy, but avoid implementing Copy because ordinary users of - // RequestContext should always move or ref it. - inner: RequestContext { - task_kind: original.task_kind, - download_behavior: original.download_behavior, - access_stats_behavior: original.access_stats_behavior, - page_content_kind: original.page_content_kind, - }, - } - } - - /// Configure the DownloadBehavior of the context: whether to - /// download missing layers, and/or warn on the download. - pub fn download_behavior(mut self, b: DownloadBehavior) -> Self { - self.inner.download_behavior = b; - self - } - - /// Configure the AccessStatsBehavior of the context: whether layer - /// accesses should update the access time of the layer. - pub(crate) fn access_stats_behavior(mut self, b: AccessStatsBehavior) -> Self { - self.inner.access_stats_behavior = b; - self - } - - pub(crate) fn page_content_kind(mut self, k: PageContentKind) -> Self { - self.inner.page_content_kind = k; - self - } - - pub fn build(self) -> RequestContext { - self.inner - } +trait Propagatable: Default { + fn propagate(&self, child: &mut Self); } impl RequestContext { - /// Create a new RequestContext that has no parent. - /// - /// The function is called `new` because, once we add children - /// to it using `detached_child` or `attached_child`, the context - /// form a tree (not implemented yet since cancellation will be - /// the first feature that requires a tree). - /// - /// # Future: Cancellation - /// - /// The only reason why a context like this one can be canceled is - /// because someone explicitly canceled it. - /// It has no parent, so it cannot inherit cancellation from there. - pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - RequestContextBuilder::new(task_kind) - .download_behavior(download_behavior) - .build() + fn root() -> Self { + Self::default() } - - /// Create a detached child context for a task that may outlive `self`. - /// - /// Use this when spawning new background activity that should complete - /// even if the current request is canceled. - /// - /// # Future: Cancellation - /// - /// Cancellation of `self` will not propagate to the child context returned - /// by this method. - /// - /// # Future: Structured Concurrency - /// - /// We could add the Future as a parameter to this function, spawn it as a task, - /// and pass to the new task the child context as an argument. - /// That would be an ergonomic improvement. - /// - /// We could make new calls to this function fail if `self` is already canceled. - pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - self.child_impl(task_kind, download_behavior) - } - - /// Create a child of context `self` for a task that shall not outlive `self`. - /// - /// Use this when fanning-out work to other async tasks. - /// - /// # Future: Cancellation - /// - /// Cancelling a context will propagate to its attached children. - /// - /// # Future: Structured Concurrency - /// - /// We could add the Future as a parameter to this function, spawn it as a task, - /// and track its `JoinHandle` inside the `RequestContext`. - /// - /// We could then provide another method to allow waiting for all child tasks - /// to finish. - /// - /// We could make new calls to this function fail if `self` is already canceled. - /// Alternatively, we could allow the creation but not spawn the task. - /// The method to wait for child tasks would return an error, indicating - /// that the child task was not started because the context was canceled. - pub fn attached_child(&self) -> Self { - self.child_impl(self.task_kind(), self.download_behavior()) - } - - /// Use this function when you should be creating a child context using - /// [`attached_child`] or [`detached_child`], but your caller doesn't provide - /// a context and you are unwilling to change all callers to provide one. - /// - /// Before we add cancellation, we should get rid of this method. - /// - /// [`attached_child`]: Self::attached_child - /// [`detached_child`]: Self::detached_child - pub fn todo_child(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - Self::new(task_kind, download_behavior) - } - - fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - Self::new(task_kind, download_behavior) - } - - pub fn task_kind(&self) -> TaskKind { - self.task_kind - } - - pub fn download_behavior(&self) -> DownloadBehavior { - self.download_behavior - } - - pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior { - self.access_stats_behavior - } - - pub(crate) fn page_content_kind(&self) -> PageContentKind { - self.page_content_kind + fn child(&self) -> RequestContext { + let mut child = RequestContext::default(); + let Self { + latency_recording, + } = self; + if let Some(latency_recording) = latency_recording { + child.latency_recording = Some(latency_recording.child()); + } + } +} + +mod latency_recording { + struct LatencyRecording { + inner: Arc>, + } + + impl LatencyRecording { + fn new() -> Self { + Self { + current: Mutex::new(HashMap::new()), + } + } + + fn on_request_recv(&self, now: Instant) -> { + let mut inner = self.inner.lock().unwrap(); + inner.current.insert(now, now); + } + + } + + impl Propagatable for LatencyRecording { + fn propagate(&self, other: &Self) { + let mut inner = self.inner.lock().unwrap(); + let other_inner = other.inner.lock().unwrap(); + for (k, v) in other_inner.current.iter() { + inner.current.insert(*k, *v); + } + } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e4fbf88364..dbdd137fc2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -728,10 +728,11 @@ impl PageServerHandler { msg = pgb.read_message() => { msg } }; - let mut recorder = SmgrOpLatencyRecorder { - start_time: Instant::now(), - ... - }; + let ctx = ctx.enrich( + SmgrOpLatencyRecorder { + start_time: Instant::now(), + parse_request: ParseRequest::unset(), + }); let received_at = Instant::now(); @@ -753,17 +754,15 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message"); - let parse_request_recorder = ParseREquest { - start_time: Instant::now(), - ... - }; + + { + + let parse_request_recorder= ctx.get::().parse_request; // parse request let neon_fe_msg = - PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; + PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version, &parse_request_recorder)?; + } - parse_request_recorder.end_time = Instant::now(); - - recorder.parse_request = parse_request_recorder; // TODO: turn in to async closure once available to avoid repeating received_at async fn record_op_start_and_throttle( @@ -931,7 +930,11 @@ impl PageServerHandler { shard: shard.downgrade(), effective_request_lsn, pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }], - recorder, + downstairs: { + let downstairs = Arc::new(Mutex::new(Downstairs::init())); + *(&mut ctx.get::().downstairs) = Arc::clone(&downstairs); + downstairs + }, } } #[cfg(feature = "testing")] @@ -1606,6 +1609,10 @@ impl PageServerHandler { async move { let _cancel_batcher = cancel_batcher.drop_guard(); loop { + let ctx = ctx.attached_child(); + + let child = ctx.attached_child(); + let maybe_batch = batch_rx.recv().await; let batch = match maybe_batch { Ok(batch) => batch, @@ -1620,6 +1627,8 @@ impl PageServerHandler { return Err(e); } }; + batch.downstairs.wait_for_execution.end_time = Instant::now(); + ctx.latency_recording.enrich(batch.downstairs); self.pagesteam_handle_batched_message( pgb_writer, batch,