mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
prorotype an alternative idea where Timeline methods populate the scope field
This commit is contained in:
@@ -94,7 +94,7 @@ use std::sync::Arc;
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{task_mgr::TaskKind, tenant::Timeline};
|
||||
use crate::{metrics::StorageIoSizeMetrics, task_mgr::TaskKind, tenant::Timeline};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Debug)]
|
||||
@@ -104,36 +104,30 @@ pub struct RequestContext {
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
scope: Scope,
|
||||
pub(crate) scope: std::sync::Mutex<Scope>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum Scope {
|
||||
Global {
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
|
||||
},
|
||||
Timeline {
|
||||
timeline: Arc<Timeline>,
|
||||
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Scope {
|
||||
pub(crate) fn new_global() -> Self {
|
||||
static GLOBAL_IO_SIZE_METRICS: Lazy<crate::metrics::StorageIoSizeMetrics> =
|
||||
Lazy::new(|| crate::metrics::StorageIoSizeMetrics::new("*", "*", "*"));
|
||||
static GLOBAL_IO_SIZE_METRICS: Lazy<Arc<crate::metrics::StorageIoSizeMetrics>> =
|
||||
Lazy::new(|| Arc::new(crate::metrics::StorageIoSizeMetrics::new("*", "*", "*")));
|
||||
Scope::Global {
|
||||
io_size_metrics: &&GLOBAL_IO_SIZE_METRICS,
|
||||
io_size_metrics: Arc::clone(&GLOBAL_IO_SIZE_METRICS),
|
||||
}
|
||||
}
|
||||
pub(crate) fn new_timeline(timeline: &Arc<Timeline>) -> Self {
|
||||
pub(crate) fn new_timeline(timeline: &Timeline) -> Self {
|
||||
Scope::Timeline {
|
||||
timeline: Arc::clone(timeline),
|
||||
}
|
||||
}
|
||||
pub(crate) fn io_size_metrics(&self) -> &crate::metrics::StorageIoSizeMetrics {
|
||||
match self {
|
||||
Scope::Global { io_size_metrics } => io_size_metrics,
|
||||
Scope::Timeline { timeline } => &timeline.metrics.storage_io_size,
|
||||
io_size_metrics: Arc::clone(&timeline.metrics.storage_io_size),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,7 +188,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::new_global(),
|
||||
scope: std::sync::Mutex::new(Scope::new_global()),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -209,11 +203,16 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: original.scope.clone(),
|
||||
scope: std::sync::Mutex::new(original.scope.lock().unwrap().clone()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn task_kind(mut self, k: TaskKind) -> Self {
|
||||
self.inner.task_kind = k;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure the DownloadBehavior of the context: whether to
|
||||
/// download missing layers, and/or warn on the download.
|
||||
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
|
||||
@@ -238,11 +237,6 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn scope(mut self, s: Scope) -> Self {
|
||||
self.inner.scope = s;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
@@ -325,7 +319,10 @@ impl RequestContext {
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
Self::new(task_kind, download_behavior)
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -348,31 +345,30 @@ impl RequestContext {
|
||||
self.read_path_debug
|
||||
}
|
||||
|
||||
pub(crate) fn scope(&self) -> &Scope {
|
||||
&self.scope
|
||||
}
|
||||
|
||||
pub(crate) fn assert_is_timeline_scoped(&self, what: &str) {
|
||||
if let Scope::Timeline { .. } = self.scope() {
|
||||
return;
|
||||
}
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("RequestContext must be timeline-scoped what={what}");
|
||||
} else {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
what,
|
||||
"RequestContext must be timeline-scoped",
|
||||
);
|
||||
});
|
||||
pub(crate) fn io_size_metrics(&self) -> Arc<StorageIoSizeMetrics> {
|
||||
let guard = self.scope.lock().unwrap();
|
||||
match &*guard {
|
||||
Scope::Global { io_size_metrics } => {
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("all VirtualFile instances are timeline-scoped");
|
||||
} else {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
"all VirtualFile instances are timeline-scoped",
|
||||
);
|
||||
});
|
||||
Arc::clone(io_size_metrics)
|
||||
}
|
||||
}
|
||||
Scope::Timeline { io_size_metrics } => Arc::clone(io_size_metrics),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use pq_proto::framed::ConnectionError;
|
||||
|
||||
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
|
||||
use strum_macros::{IntoStaticStr, VariantNames};
|
||||
use utils::id::{TimelineId};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
@@ -2835,7 +2835,7 @@ pub(crate) struct TimelineMetrics {
|
||||
/// Number of valid LSN leases.
|
||||
pub valid_lsn_lease_count_gauge: UIntGauge,
|
||||
pub wal_records_received: IntCounter,
|
||||
pub storage_io_size: StorageIoSizeMetrics,
|
||||
pub storage_io_size: Arc<StorageIoSizeMetrics>,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -2971,7 +2971,11 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let storage_io_size = StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id);
|
||||
let storage_io_size = Arc::new(StorageIoSizeMetrics::new(
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
));
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
|
||||
@@ -132,7 +132,6 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<u64> {
|
||||
ctx.assert_is_timeline_scoped("EphemeralFile::write_raw");
|
||||
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
|
||||
@@ -72,6 +72,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::context::{self};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::tenant::storage_layer::ImageLayerName;
|
||||
use crate::{
|
||||
@@ -1299,7 +1300,13 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
ctx.assert_is_timeline_scoped("Timeline::get_vectored_impl");
|
||||
let prev_scope = std::mem::replace(
|
||||
&mut *ctx.scope.lock().unwrap(),
|
||||
context::Scope::new_timeline(self),
|
||||
);
|
||||
scopeguard::defer! {
|
||||
*ctx.scope.lock().unwrap() = prev_scope;
|
||||
}
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(keyspace.clone(), lsn))
|
||||
|
||||
@@ -930,7 +930,7 @@ impl VirtualFileInner {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
ctx.scope().io_size_metrics().read.add(size.into_u64());
|
||||
ctx.io_size_metrics().read.add(size.into_u64());
|
||||
}
|
||||
(buf, res)
|
||||
})
|
||||
@@ -962,7 +962,7 @@ impl VirtualFileInner {
|
||||
let ((_file_guard, buf), result) =
|
||||
io_engine::get().write_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = result {
|
||||
ctx.scope().io_size_metrics().write.add(size.into_u64());
|
||||
ctx.io_size_metrics().write.add(size.into_u64());
|
||||
}
|
||||
(buf, result)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user