mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
8 Commits
cloneable/
...
problame/v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7c2379cc8 | ||
|
|
f7327ad2ae | ||
|
|
fb63bd1425 | ||
|
|
87e045563c | ||
|
|
b688d1e80c | ||
|
|
a8664024e8 | ||
|
|
26da69605d | ||
|
|
5831573953 |
@@ -89,7 +89,12 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use std::sync::Arc;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{metrics::StorageIoSizeMetrics, task_mgr::TaskKind, tenant::Timeline};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Debug)]
|
||||
@@ -99,6 +104,32 @@ pub struct RequestContext {
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
pub(crate) scope: std::sync::Mutex<Scope>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum Scope {
|
||||
Global {
|
||||
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
|
||||
},
|
||||
Timeline {
|
||||
io_size_metrics: Arc<crate::metrics::StorageIoSizeMetrics>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Scope {
|
||||
pub(crate) fn new_global() -> Self {
|
||||
static GLOBAL_IO_SIZE_METRICS: Lazy<Arc<crate::metrics::StorageIoSizeMetrics>> =
|
||||
Lazy::new(|| Arc::new(crate::metrics::StorageIoSizeMetrics::new("*", "*", "*")));
|
||||
Scope::Global {
|
||||
io_size_metrics: Arc::clone(&GLOBAL_IO_SIZE_METRICS),
|
||||
}
|
||||
}
|
||||
pub(crate) fn new_timeline(timeline: &Timeline) -> Self {
|
||||
Scope::Timeline {
|
||||
io_size_metrics: Arc::clone(&timeline.metrics.storage_io_size),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -157,6 +188,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: std::sync::Mutex::new(Scope::new_global()),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -171,10 +203,16 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
scope: std::sync::Mutex::new(original.scope.lock().unwrap().clone()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn task_kind(mut self, k: TaskKind) -> Self {
|
||||
self.inner.task_kind = k;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure the DownloadBehavior of the context: whether to
|
||||
/// download missing layers, and/or warn on the download.
|
||||
pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
|
||||
@@ -281,7 +319,10 @@ impl RequestContext {
|
||||
}
|
||||
|
||||
fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
|
||||
Self::new(task_kind, download_behavior)
|
||||
RequestContextBuilder::extend(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn task_kind(&self) -> TaskKind {
|
||||
@@ -303,4 +344,31 @@ impl RequestContext {
|
||||
pub(crate) fn read_path_debug(&self) -> bool {
|
||||
self.read_path_debug
|
||||
}
|
||||
|
||||
pub(crate) fn io_size_metrics(&self) -> Arc<StorageIoSizeMetrics> {
|
||||
let guard = self.scope.lock().unwrap();
|
||||
match &*guard {
|
||||
Scope::Global { io_size_metrics } => {
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("all VirtualFile instances are timeline-scoped");
|
||||
} else {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
"all VirtualFile instances are timeline-scoped",
|
||||
);
|
||||
});
|
||||
Arc::clone(io_size_metrics)
|
||||
}
|
||||
}
|
||||
Scope::Timeline { io_size_metrics } => Arc::clone(io_size_metrics),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1200,11 +1200,24 @@ impl StorageIoTime {
|
||||
|
||||
pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(StorageIoTime::new);
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl StorageIoSizeOperation {
|
||||
const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
|
||||
fn as_str(&self) -> &'static str {
|
||||
Self::VARIANTS[*self as usize]
|
||||
}
|
||||
}
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
&["operation", "tenant_id", "shard_id", "timeline_id"]
|
||||
@@ -1212,6 +1225,34 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct StorageIoSizeMetrics {
|
||||
pub read: UIntGauge,
|
||||
pub write: UIntGauge,
|
||||
}
|
||||
|
||||
impl StorageIoSizeMetrics {
|
||||
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
|
||||
let read = STORAGE_IO_SIZE
|
||||
.get_metric_with_label_values(&[
|
||||
StorageIoSizeOperation::Read.as_str(),
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
])
|
||||
.unwrap();
|
||||
let write = STORAGE_IO_SIZE
|
||||
.get_metric_with_label_values(&[
|
||||
StorageIoSizeOperation::Write.as_str(),
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
])
|
||||
.unwrap();
|
||||
Self { read, write }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) mod virtual_file_descriptor_cache {
|
||||
use super::*;
|
||||
@@ -2794,6 +2835,7 @@ pub(crate) struct TimelineMetrics {
|
||||
/// Number of valid LSN leases.
|
||||
pub valid_lsn_lease_count_gauge: UIntGauge,
|
||||
pub wal_records_received: IntCounter,
|
||||
pub storage_io_size: Arc<StorageIoSizeMetrics>,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -2929,6 +2971,12 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let storage_io_size = Arc::new(StorageIoSizeMetrics::new(
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
));
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -2958,6 +3006,7 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
storage_io_size,
|
||||
valid_lsn_lease_count_gauge,
|
||||
wal_records_received,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
@@ -3148,7 +3197,7 @@ impl TimelineMetrics {
|
||||
]);
|
||||
}
|
||||
|
||||
for op in STORAGE_IO_SIZE_OPERATIONS {
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::context::{self};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::tenant::storage_layer::ImageLayerName;
|
||||
use crate::{
|
||||
@@ -323,7 +324,7 @@ pub struct Timeline {
|
||||
ancestor_timeline: Option<Arc<Timeline>>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
pub(super) metrics: TimelineMetrics,
|
||||
pub(crate) metrics: TimelineMetrics,
|
||||
|
||||
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
|
||||
// in `crate::page_service` writes these metrics.
|
||||
@@ -1299,6 +1300,14 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let prev_scope = std::mem::replace(
|
||||
&mut *ctx.scope.lock().unwrap(),
|
||||
context::Scope::new_timeline(self),
|
||||
);
|
||||
scopeguard::defer! {
|
||||
*ctx.scope.lock().unwrap() = prev_scope;
|
||||
}
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(keyspace.clone(), lsn))
|
||||
} else {
|
||||
@@ -2820,8 +2829,17 @@ impl Timeline {
|
||||
"layer flush task",
|
||||
async move {
|
||||
let _guard = guard;
|
||||
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
|
||||
let ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||
|
||||
let prev_scope = std::mem::replace(
|
||||
&mut *ctx.scope.lock().unwrap(),
|
||||
context::Scope::new_timeline(&self_clone),
|
||||
);
|
||||
scopeguard::defer! {
|
||||
*ctx.scope.lock().unwrap() = prev_scope;
|
||||
}
|
||||
|
||||
self_clone.flush_loop(layer_flush_start_rx, &ctx).await;
|
||||
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
|
||||
assert!(matches!(*flush_loop_state, FlushLoopState::Running{..}));
|
||||
*flush_loop_state = FlushLoopState::Exited;
|
||||
@@ -4519,7 +4537,14 @@ impl Timeline {
|
||||
let self_clone = Arc::clone(self);
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
let ctx = ctx.attached_child();
|
||||
let timeline_scope = context::Scope::new_timeline(self);
|
||||
let work = async move {
|
||||
let ctx = ctx;
|
||||
let prev_scope = std::mem::replace(&mut *ctx.scope.lock().unwrap(), timeline_scope);
|
||||
scopeguard::defer! {
|
||||
*ctx.scope.lock().unwrap() = prev_scope;
|
||||
}
|
||||
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.await?
|
||||
@@ -6680,6 +6705,14 @@ impl TimelineWriter<'_> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let prev_scope = std::mem::replace(
|
||||
&mut *ctx.scope.lock().unwrap(),
|
||||
context::Scope::new_timeline(self),
|
||||
);
|
||||
scopeguard::defer! {
|
||||
*ctx.scope.lock().unwrap() = prev_scope;
|
||||
};
|
||||
|
||||
// In debug builds, assert that we don't write any keys that don't belong to this shard.
|
||||
// We don't assert this in release builds, since key ownership policies may change over
|
||||
// time. Stray keys will be removed during compaction.
|
||||
|
||||
@@ -11,11 +11,11 @@
|
||||
//! This is similar to PostgreSQL's virtual file descriptor facility in
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_TIME_METRIC};
|
||||
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
@@ -23,7 +23,6 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
|
||||
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -120,7 +119,7 @@ impl VirtualFile {
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
@@ -132,7 +131,7 @@ impl VirtualFile {
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let file = match get_io_mode() {
|
||||
IoMode::Buffered => {
|
||||
@@ -303,13 +302,6 @@ pub struct VirtualFileInner {
|
||||
/// storing it here.
|
||||
pub path: Utf8PathBuf,
|
||||
open_options: OpenOptions,
|
||||
|
||||
// These are strings becase we only use them for metrics, and those expect strings.
|
||||
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
|
||||
// strings.
|
||||
tenant_id: String,
|
||||
shard_id: String,
|
||||
timeline_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
@@ -591,36 +583,16 @@ impl VirtualFileInner {
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
let path_ref = path.as_ref();
|
||||
let path_str = path_ref.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let (tenant_id, shard_id, timeline_id) =
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
|
||||
let tenant_shard_part = parts[parts.len() - 4];
|
||||
let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
|
||||
Ok(tenant_shard_id) => (
|
||||
tenant_shard_id.tenant_id.to_string(),
|
||||
format!("{}", tenant_shard_id.shard_slug()),
|
||||
),
|
||||
Err(_) => {
|
||||
// Malformed path: this ID is just for observability, so tolerate it
|
||||
// and pass through
|
||||
(tenant_shard_part.to_string(), "*".to_string())
|
||||
}
|
||||
};
|
||||
(tenant_id, shard_id, parts[parts.len() - 2].to_string())
|
||||
} else {
|
||||
("*".to_string(), "*".to_string(), "*".to_string())
|
||||
};
|
||||
let path = path.as_ref();
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path_ref.as_std_path()).await?
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
@@ -636,11 +608,8 @@ impl VirtualFileInner {
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path_ref.to_path_buf(),
|
||||
path: path.to_owned(),
|
||||
open_options: reopen_options,
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
// TODO: Under pressure, it's likely the slot will get re-used and
|
||||
@@ -943,7 +912,7 @@ impl VirtualFileInner {
|
||||
&self,
|
||||
buf: tokio_epoll_uring::Slice<Buf>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
ctx: &RequestContext,
|
||||
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
@@ -961,14 +930,7 @@ impl VirtualFileInner {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
"read",
|
||||
&self.tenant_id,
|
||||
&self.shard_id,
|
||||
&self.timeline_id,
|
||||
])
|
||||
.add(size as i64);
|
||||
ctx.io_size_metrics().read.add(size.into_u64());
|
||||
}
|
||||
(buf, res)
|
||||
})
|
||||
@@ -979,9 +941,9 @@ impl VirtualFileInner {
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
|
||||
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
(slice, result)
|
||||
}
|
||||
@@ -990,7 +952,7 @@ impl VirtualFileInner {
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
@@ -1000,14 +962,7 @@ impl VirtualFileInner {
|
||||
let ((_file_guard, buf), result) =
|
||||
io_engine::get().write_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
"write",
|
||||
&self.tenant_id,
|
||||
&self.shard_id,
|
||||
&self.timeline_id,
|
||||
])
|
||||
.add(size as i64);
|
||||
ctx.io_size_metrics().write.add(size.into_u64());
|
||||
}
|
||||
(buf, result)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user