Compare commits

...

8 Commits

4 changed files with 173 additions and 68 deletions

View File

@@ -89,7 +89,12 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use crate::task_mgr::TaskKind;
use std::sync::Arc;
use once_cell::sync::Lazy;
use tracing::warn;
use crate::{metrics::StorageIoSizeMetrics, task_mgr::TaskKind, tenant::Timeline};
// The main structure of this module, see module-level comment.
#[derive(Debug)]
@@ -99,6 +104,32 @@ pub struct RequestContext {
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
read_path_debug: bool,
pub(crate) scope: std::sync::Mutex<Scope>,
}
#[derive(Clone, Debug)]
pub(crate) enum Scope {
Global {
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
},
Timeline {
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
},
}
impl Scope {
pub(crate) fn new_global() -> Self {
static GLOBAL_IO_SIZE_METRICS: Lazy<Arc<crate::metrics::StorageIoSizeMetrics>> =
Lazy::new(|| Arc::new(crate::metrics::StorageIoSizeMetrics::new("*", "*", "*")));
Scope::Global {
io_size_metrics: Arc::clone(&GLOBAL_IO_SIZE_METRICS),
}
}
pub(crate) fn new_timeline(timeline: &Timeline) -> Self {
Scope::Timeline {
io_size_metrics: Arc::clone(&timeline.metrics.storage_io_size),
}
}
}
/// The kind of access to the page cache.
@@ -157,6 +188,7 @@ impl RequestContextBuilder {
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
scope: std::sync::Mutex::new(Scope::new_global()),
},
}
}
@@ -171,10 +203,16 @@ impl RequestContextBuilder {
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
scope: std::sync::Mutex::new(original.scope.lock().unwrap().clone()),
},
}
}
pub fn task_kind(mut self, k: TaskKind) -> Self {
self.inner.task_kind = k;
self
}
/// Configure the DownloadBehavior of the context: whether to
/// download missing layers, and/or warn on the download.
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
@@ -281,7 +319,10 @@ impl RequestContext {
}
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
RequestContextBuilder::extend(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.build()
}
pub fn task_kind(&self) -> TaskKind {
@@ -303,4 +344,31 @@ impl RequestContext {
pub(crate) fn read_path_debug(&self) -> bool {
self.read_path_debug
}
pub(crate) fn io_size_metrics(&self) -> Arc<StorageIoSizeMetrics> {
let guard = self.scope.lock().unwrap();
match &*guard {
Scope::Global { io_size_metrics } => {
if cfg!(debug_assertions) || cfg!(feature = "testing") {
panic!("all VirtualFile instances are timeline-scoped");
} else {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::time::Duration;
use utils::rate_limit::RateLimit;
static LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut guard = LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
warn!(
%rate_limit_stats,
"all VirtualFile instances are timeline-scoped",
);
});
Arc::clone(io_size_metrics)
}
}
Scope::Timeline { io_size_metrics } => Arc::clone(io_size_metrics),
}
}
}

View File

@@ -1200,11 +1200,24 @@ impl StorageIoTime {
pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(StorageIoTime::new);
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
#[derive(Clone, Copy)]
#[repr(usize)]
enum StorageIoSizeOperation {
Read,
Write,
}
impl StorageIoSizeOperation {
const VARIANTS: &'static [&'static str] = &["read", "write"];
fn as_str(&self) -> &'static str {
Self::VARIANTS[*self as usize]
}
}
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_io_operations_bytes_total",
"Total amount of bytes read/written in IO operations",
&["operation", "tenant_id", "shard_id", "timeline_id"]
@@ -1212,6 +1225,34 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
#[derive(Clone, Debug)]
pub(crate) struct StorageIoSizeMetrics {
pub read: UIntGauge,
pub write: UIntGauge,
}
impl StorageIoSizeMetrics {
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
let read = STORAGE_IO_SIZE
.get_metric_with_label_values(&[
StorageIoSizeOperation::Read.as_str(),
tenant_id,
shard_id,
timeline_id,
])
.unwrap();
let write = STORAGE_IO_SIZE
.get_metric_with_label_values(&[
StorageIoSizeOperation::Write.as_str(),
tenant_id,
shard_id,
timeline_id,
])
.unwrap();
Self { read, write }
}
}
#[cfg(not(test))]
pub(crate) mod virtual_file_descriptor_cache {
use super::*;
@@ -2794,6 +2835,7 @@ pub(crate) struct TimelineMetrics {
/// Number of valid LSN leases.
pub valid_lsn_lease_count_gauge: UIntGauge,
pub wal_records_received: IntCounter,
pub storage_io_size: Arc<StorageIoSizeMetrics>,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -2929,6 +2971,12 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let storage_io_size = Arc::new(StorageIoSizeMetrics::new(
&tenant_id,
&shard_id,
&timeline_id,
));
TimelineMetrics {
tenant_id,
shard_id,
@@ -2958,6 +3006,7 @@ impl TimelineMetrics {
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
storage_io_size,
valid_lsn_lease_count_gauge,
wal_records_received,
shutdown: std::sync::atomic::AtomicBool::default(),
@@ -3148,7 +3197,7 @@ impl TimelineMetrics {
]);
}
for op in STORAGE_IO_SIZE_OPERATIONS {
for op in StorageIoSizeOperation::VARIANTS {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
}

View File

@@ -72,6 +72,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{self};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::tenant::storage_layer::ImageLayerName;
use crate::{
@@ -323,7 +324,7 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
pub(super) metrics: TimelineMetrics,
pub(crate) metrics: TimelineMetrics,
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
// in `crate::page_service` writes these metrics.
@@ -1299,6 +1300,14 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let prev_scope = std::mem::replace(
&mut *ctx.scope.lock().unwrap(),
context::Scope::new_timeline(self),
);
scopeguard::defer! {
*ctx.scope.lock().unwrap() = prev_scope;
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(keyspace.clone(), lsn))
} else {
@@ -2820,8 +2829,17 @@ impl Timeline {
"layer flush task",
async move {
let _guard = guard;
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
let ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
let prev_scope = std::mem::replace(
&mut *ctx.scope.lock().unwrap(),
context::Scope::new_timeline(&self_clone),
);
scopeguard::defer! {
*ctx.scope.lock().unwrap() = prev_scope;
}
self_clone.flush_loop(layer_flush_start_rx, &ctx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert!(matches!(*flush_loop_state, FlushLoopState::Running{..}));
*flush_loop_state = FlushLoopState::Exited;
@@ -4519,7 +4537,14 @@ impl Timeline {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let timeline_scope = context::Scope::new_timeline(self);
let work = async move {
let ctx = ctx;
let prev_scope = std::mem::replace(&mut *ctx.scope.lock().unwrap(), timeline_scope);
scopeguard::defer! {
*ctx.scope.lock().unwrap() = prev_scope;
}
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.await?
@@ -6680,6 +6705,14 @@ impl TimelineWriter<'_> {
return Ok(());
}
let prev_scope = std::mem::replace(
&mut *ctx.scope.lock().unwrap(),
context::Scope::new_timeline(self),
);
scopeguard::defer! {
*ctx.scope.lock().unwrap() = prev_scope;
};
// In debug builds, assert that we don't write any keys that don't belong to this shard.
// We don't assert this in release builds, since key ownership policies may change over
// time. Stray keys will be removed during compaction.

View File

@@ -11,11 +11,11 @@
//! This is similar to PostgreSQL's virtual file descriptor facility in
//! src/backend/storage/file/fd.c
//!
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::RequestContext;
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::metrics::{StorageIoOperation, STORAGE_IO_TIME_METRIC};
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
@@ -23,7 +23,6 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
@@ -120,7 +119,7 @@ impl VirtualFile {
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile {
@@ -132,7 +131,7 @@ impl VirtualFile {
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
@@ -303,13 +302,6 @@ pub struct VirtualFileInner {
/// storing it here.
pub path: Utf8PathBuf,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
// strings.
tenant_id: String,
shard_id: String,
timeline_id: String,
}
#[derive(Debug, PartialEq, Clone, Copy)]
@@ -591,36 +583,16 @@ impl VirtualFileInner {
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
_ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let (tenant_id, shard_id, timeline_id) =
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
let tenant_shard_part = parts[parts.len() - 4];
let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
Ok(tenant_shard_id) => (
tenant_shard_id.tenant_id.to_string(),
format!("{}", tenant_shard_id.shard_slug()),
),
Err(_) => {
// Malformed path: this ID is just for observability, so tolerate it
// and pass through
(tenant_shard_part.to_string(), "*".to_string())
}
};
(tenant_id, shard_id, parts[parts.len() - 2].to_string())
} else {
("*".to_string(), "*".to_string(), "*".to_string())
};
let path = path.as_ref();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = observe_duration!(StorageIoOperation::Open, {
open_options.open(path_ref.as_std_path()).await?
open_options.open(path.as_std_path()).await?
});
// Strip all options other than read and write.
@@ -636,11 +608,8 @@ impl VirtualFileInner {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
path: path.to_owned(),
open_options: reopen_options,
tenant_id,
shard_id,
timeline_id,
};
// TODO: Under pressure, it's likely the slot will get re-used and
@@ -943,7 +912,7 @@ impl VirtualFileInner {
&self,
buf: tokio_epoll_uring::Slice<Buf>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
ctx: &RequestContext,
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
where
Buf: tokio_epoll_uring::IoBufMut + Send,
@@ -961,14 +930,7 @@ impl VirtualFileInner {
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
if let Ok(size) = res {
STORAGE_IO_SIZE
.with_label_values(&[
"read",
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.add(size as i64);
ctx.io_size_metrics().read.add(size.into_u64());
}
(buf, res)
})
@@ -979,9 +941,9 @@ impl VirtualFileInner {
&self,
buf: FullSlice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
let result = result.maybe_fatal_err("write_at");
(slice, result)
}
@@ -990,7 +952,7 @@ impl VirtualFileInner {
&self,
buf: FullSlice<B>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -1000,14 +962,7 @@ impl VirtualFileInner {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&[
"write",
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.add(size as i64);
ctx.io_size_metrics().write.add(size.into_u64());
}
(buf, result)
})