mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
some more WIP on impl
This commit is contained in:
@@ -89,10 +89,14 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TimelineId;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{metrics::StorageIoSizeMetrics, task_mgr::TaskKind};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::{
|
||||
task_mgr::TaskKind,
|
||||
tenant::{Tenant, Timeline},
|
||||
};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Debug)]
|
||||
@@ -108,58 +112,45 @@ pub struct RequestContext {
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum Scope {
|
||||
Global {
|
||||
io_size_metrics: crate::metrics::StorageIoSizeMetrics,
|
||||
io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
|
||||
},
|
||||
TenantShard {
|
||||
tenant_shard_id: TenantShardId,
|
||||
io_size_metrics: crate::metrics::StorageIoSizeMetrics,
|
||||
Tenant {
|
||||
tenant: Arc<Tenant>,
|
||||
},
|
||||
Timeline {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
io_size_metrics: crate::metrics::StorageIoSizeMetrics,
|
||||
timeline: Arc<Timeline>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Scope {
|
||||
pub(crate) fn global() -> Self {
|
||||
pub(crate) fn new_global() -> Self {
|
||||
static GLOBAL_IO_SIZE_METRICS: Lazy<crate::metrics::StorageIoSizeMetrics> =
|
||||
Lazy::new(|| crate::metrics::StorageIoSizeMetrics::new("*", "*", "*"));
|
||||
Scope::Global {
|
||||
io_size_metrics: StorageIoSizeMetrics::new("*", "*", "*"),
|
||||
io_size_metrics: &&GLOBAL_IO_SIZE_METRICS,
|
||||
}
|
||||
}
|
||||
pub(crate) fn tenant_shard(tenant_shard_id: TenantShardId) -> Self {
|
||||
Scope::TenantShard {
|
||||
tenant_shard_id,
|
||||
io_size_metrics: StorageIoSizeMetrics::new(
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
"*",
|
||||
),
|
||||
pub(crate) fn new_tenant(tenant: &Arc<Tenant>) -> Self {
|
||||
Scope::Tenant {
|
||||
tenant: Arc::clone(tenant),
|
||||
}
|
||||
}
|
||||
pub(crate) fn timeline(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
|
||||
pub(crate) fn new_timeline(timeline: &Arc<Timeline>) -> Self {
|
||||
Scope::Timeline {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
io_size_metrics: {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_slug = tenant_shard_id.shard_slug().to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
StorageIoSizeMetrics::new(&tenant_id, &shard_slug, &timeline_id)
|
||||
},
|
||||
timeline: Arc::clone(timeline),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn io_size_metrics(&self) -> &crate::metrics::StorageIoSizeMetrics {
|
||||
match self {
|
||||
Scope::Global { io_size_metrics } => io_size_metrics,
|
||||
Scope::TenantShard {
|
||||
io_size_metrics, ..
|
||||
} => io_size_metrics,
|
||||
Scope::Timeline {
|
||||
io_size_metrics, ..
|
||||
} => io_size_metrics,
|
||||
Scope::Tenant { tenant } => &tenant.virtual_file_io_metrics,
|
||||
Scope::Timeline { timeline } => &timeline.metrics.storage_io_size,
|
||||
}
|
||||
}
|
||||
pub(crate) fn is_timeline(&self) -> bool {
|
||||
matches!(self, Scope::Timeline { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -218,7 +209,7 @@ impl RequestContextBuilder {
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
scope: Scope::global(),
|
||||
scope: Scope::new_global(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
@@ -2616,8 +2617,9 @@ async fn getpage_at_lsn_handler_inner(
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true)
|
||||
.scope(context::Scope::new_timeline(&timeline)).build();
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
|
||||
@@ -29,7 +29,7 @@ use pq_proto::framed::ConnectionError;
|
||||
|
||||
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
|
||||
use strum_macros::{IntoStaticStr, VariantNames};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::{TimelineId};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
@@ -1200,7 +1200,7 @@ impl StorageIoTime {
|
||||
|
||||
pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(StorageIoTime::new);
|
||||
|
||||
#[derive(Clone,Copy)]
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
Read,
|
||||
@@ -1251,6 +1251,25 @@ impl StorageIoSizeMetrics {
|
||||
.unwrap();
|
||||
Self { read, write }
|
||||
}
|
||||
|
||||
pub(crate) fn new_tenant(tenant_shard_id: &TenantShardId) -> Self {
|
||||
Self::new(
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
"*",
|
||||
)
|
||||
}
|
||||
|
||||
fn remove_per_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
for operation in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
operation,
|
||||
&tenant_shard_id.tenant_id.to_string(),
|
||||
&tenant_shard_id.shard_slug().to_string(),
|
||||
"*",
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
@@ -3241,6 +3260,8 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
|
||||
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
|
||||
|
||||
StorageIoSizeMetrics::remove_per_tenant_metrics(&tenant_shard_id);
|
||||
|
||||
// we leave the BROKEN_TENANTS_SET entry if any
|
||||
}
|
||||
|
||||
|
||||
@@ -387,6 +387,8 @@ pub struct Tenant {
|
||||
|
||||
pub(crate) pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
|
||||
pub(crate) virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics,
|
||||
|
||||
/// An ongoing timeline detach concurrency limiter.
|
||||
///
|
||||
/// As a tenant will likely be restarted as part of timeline detach ancestor it makes no sense
|
||||
@@ -1334,7 +1336,8 @@ impl Tenant {
|
||||
let tenant_clone = Arc::clone(&tenant);
|
||||
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
|
||||
let ctx = RequestContextBuilder::extend(&ctx)
|
||||
.scope(context::Scope::tenant_shard(tenant_shard_id));
|
||||
.scope(context::Scope::new_tenant(&tenant))
|
||||
.build();
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Attach,
|
||||
@@ -1721,10 +1724,7 @@ impl Tenant {
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.scope(context::Scope::timeline(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
))
|
||||
.scope(context::Scope::new_tenant(self))
|
||||
.build();
|
||||
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
@@ -1775,7 +1775,7 @@ impl Tenant {
|
||||
import_pgdata,
|
||||
ActivateTimelineArgs::No,
|
||||
guard,
|
||||
&ctx,
|
||||
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -2813,6 +2813,7 @@ impl Tenant {
|
||||
index_part,
|
||||
activate,
|
||||
timeline_create_guard,
|
||||
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
|
||||
));
|
||||
|
||||
// NB: the timeline doesn't exist in self.timelines at this point
|
||||
@@ -2826,7 +2827,7 @@ impl Tenant {
|
||||
index_part: import_pgdata::index_part_format::Root,
|
||||
activate: ActivateTimelineArgs,
|
||||
timeline_create_guard: TimelineCreateGuard,
|
||||
ctx: &RequestContext,
|
||||
ctx: RequestContext,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
info!("starting");
|
||||
@@ -2838,6 +2839,7 @@ impl Tenant {
|
||||
index_part,
|
||||
activate,
|
||||
timeline_create_guard,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = &res {
|
||||
@@ -2853,9 +2855,8 @@ impl Tenant {
|
||||
index_part: import_pgdata::index_part_format::Root,
|
||||
activate: ActivateTimelineArgs,
|
||||
timeline_create_guard: TimelineCreateGuard,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let ctx = RequestContext::detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn);
|
||||
|
||||
info!("importing pgdata");
|
||||
import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone())
|
||||
.await
|
||||
@@ -4282,6 +4283,7 @@ impl Tenant {
|
||||
pagestream_throttle_metrics: Arc::new(
|
||||
crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id),
|
||||
),
|
||||
virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics::new_tenant(&tenant_shard_id),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
gc_block: Default::default(),
|
||||
|
||||
@@ -323,7 +323,7 @@ pub struct Timeline {
|
||||
ancestor_timeline: Option<Arc<Timeline>>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
pub(super) metrics: TimelineMetrics,
|
||||
pub(crate) metrics: TimelineMetrics,
|
||||
|
||||
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
|
||||
// in `crate::page_service` writes these metrics.
|
||||
@@ -1131,6 +1131,21 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if !ctx.scope().is_timeline() {
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("get() called with RequestContext in non-timeline scope");
|
||||
} else {
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
"get() called with RequestContext in non-timeline scope",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
if !lsn.is_valid() {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::context::RequestContext;
|
||||
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_TIME_METRIC};
|
||||
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
@@ -24,7 +23,6 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
|
||||
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -587,16 +585,14 @@ impl VirtualFileInner {
|
||||
open_options: &OpenOptions,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
let path_ref = path.as_ref();
|
||||
let path_str = path_ref.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let path = path.as_ref();
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path_ref.as_std_path()).await?
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
@@ -612,7 +608,7 @@ impl VirtualFileInner {
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path_ref.to_path_buf(),
|
||||
path: path.to_owned(),
|
||||
open_options: reopen_options,
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user