mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
pageserver: add PageTrace machinery
This commit is contained in:
@@ -61,7 +61,7 @@ use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::mgr::TenantManager;
|
||||
use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
|
||||
use crate::tenant::timeline::{self, WaitLsnError};
|
||||
use crate::tenant::timeline::{self, PageTraceEvent, WaitLsnError};
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -1702,6 +1702,18 @@ impl PageServerHandler {
|
||||
.query_metrics
|
||||
.observe_getpage_batch_start(requests.len());
|
||||
|
||||
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
|
||||
let time = SystemTime::now();
|
||||
for BatchedGetPageRequest { req, timer: _ } in &requests {
|
||||
let key = rel_block_to_key(req.rel, req.blkno).to_compact();
|
||||
page_trace.send(PageTraceEvent {
|
||||
key,
|
||||
effective_lsn,
|
||||
time,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let results = timeline
|
||||
.get_rel_page_at_lsn_batched(
|
||||
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
|
||||
|
||||
@@ -26,8 +26,8 @@ use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
key::{
|
||||
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
@@ -434,6 +434,49 @@ pub struct Timeline {
|
||||
|
||||
/// Cf. [`crate::tenant::CreateTimelineIdempotency`].
|
||||
pub(crate) create_idempotency: crate::tenant::CreateTimelineIdempotency,
|
||||
|
||||
pub(crate) page_trace: ArcSwap<Option<PageTrace>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub(crate) struct PageTraceEvent {
|
||||
pub(crate) key: CompactKey,
|
||||
pub(crate) effective_lsn: Lsn,
|
||||
pub(crate) time: SystemTime,
|
||||
}
|
||||
|
||||
/// When one of these is instantiated for a tenant, it will be used to record fine-grained
|
||||
/// history of getpage@lsn requests.
|
||||
pub(crate) struct PageTrace {
|
||||
size_limit: u64,
|
||||
size: AtomicU64,
|
||||
trace_tx: tokio::sync::mpsc::UnboundedSender<PageTraceEvent>,
|
||||
}
|
||||
|
||||
impl PageTrace {
|
||||
pub(crate) fn new(
|
||||
size_limit: u64,
|
||||
) -> (Self, tokio::sync::mpsc::UnboundedReceiver<PageTraceEvent>) {
|
||||
let (trace_tx, trace_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let page_trace = Self {
|
||||
size_limit,
|
||||
size: AtomicU64::new(0),
|
||||
trace_tx,
|
||||
};
|
||||
|
||||
(page_trace, trace_rx)
|
||||
}
|
||||
|
||||
pub(crate) fn send(&self, event: PageTraceEvent) {
|
||||
if self.size.load(std::sync::atomic::Ordering::Relaxed) < self.size_limit {
|
||||
self.size.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
if let Err(_e) = self.trace_tx.send(event) {
|
||||
// Ignore errors: if the receiver is gone, we'll just write up to our size limit
|
||||
// and then stop.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
|
||||
@@ -2380,6 +2423,8 @@ impl Timeline {
|
||||
attach_wal_lag_cooldown,
|
||||
|
||||
create_idempotency,
|
||||
|
||||
page_trace: Default::default(),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
|
||||
Reference in New Issue
Block a user