mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
add RequestContext plumbing for layer access stats
In preparation for #3496 plumb through RequestContext to the data access methods of `PersistentLayer`. This is PR https://github.com/neondatabase/neon/pull/3504
This commit is contained in:
committed by
GitHub
parent
590695e845
commit
f1aece1ba0
@@ -12,7 +12,9 @@ use anyhow::Context;
|
||||
use clap::{value_parser, Arg, Command};
|
||||
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
@@ -75,7 +77,8 @@ fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
dump_layerfile_from_path(path, true)
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
|
||||
|
||||
@@ -255,6 +255,8 @@ pub enum TaskKind {
|
||||
// A request that comes in via the pageserver HTTP API.
|
||||
MgmtRequest,
|
||||
|
||||
DebugTool,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -2643,7 +2643,11 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
|
||||
pub fn dump_layerfile_from_path(
|
||||
path: &Path,
|
||||
verbose: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
// All layer files start with a two-byte "magic" value, to identify the kind of
|
||||
@@ -2653,8 +2657,8 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()
|
||||
file.read_exact_at(&mut header_buf, 0)?;
|
||||
|
||||
match u16::from_be_bytes(header_buf) {
|
||||
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose)?,
|
||||
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose)?,
|
||||
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
|
||||
magic => bail!("unrecognized magic identifier: {:?}", magic),
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@
|
||||
mod historic_layer_coverage;
|
||||
mod layer_coverage;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::KeyPartitioning;
|
||||
use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
@@ -654,22 +655,22 @@ where
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self, verbose: bool) -> Result<()> {
|
||||
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!("Begin dump LayerMap");
|
||||
|
||||
println!("open_layer:");
|
||||
if let Some(open_layer) = &self.open_layer {
|
||||
open_layer.dump(verbose)?;
|
||||
open_layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("frozen_layers:");
|
||||
for frozen_layer in self.frozen_layers.iter() {
|
||||
frozen_layer.dump(verbose)?;
|
||||
frozen_layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
for layer in self.iter_historic_layers() {
|
||||
layer.dump(verbose)?;
|
||||
layer.dump(verbose, ctx)?;
|
||||
}
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
|
||||
@@ -6,6 +6,7 @@ mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod remote_layer;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Result;
|
||||
@@ -117,13 +118,14 @@ pub trait Layer: Send + Sync {
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||
fn short_id(&self) -> String;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool) -> Result<()>;
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Returned by [`Layer::iter`]
|
||||
@@ -161,11 +163,11 @@ pub trait PersistentLayer: Layer {
|
||||
fn local_path(&self) -> Option<PathBuf>;
|
||||
|
||||
/// Iterate through all keys and values stored in the layer
|
||||
fn iter(&self) -> Result<LayerIter<'_>>;
|
||||
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>>;
|
||||
|
||||
/// Iterate through all keys stored in the layer. Returns key, lsn and value size
|
||||
/// It is used only for compaction and so is currently implemented only for DeltaLayer
|
||||
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
|
||||
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
panic!("Not implemented")
|
||||
}
|
||||
|
||||
@@ -231,6 +233,7 @@ impl Layer for LayerDescriptor {
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
@@ -239,7 +242,7 @@ impl Layer for LayerDescriptor {
|
||||
self.short_id.clone()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
//! "values" part.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{PageReadGuard, PAGE_SZ};
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
@@ -214,7 +215,7 @@ impl Layer for DeltaLayer {
|
||||
self.filename().file_name()
|
||||
}
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.tenant_id,
|
||||
@@ -229,7 +230,7 @@ impl Layer for DeltaLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
let inner = self.load(ctx)?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
@@ -293,6 +294,7 @@ impl Layer for DeltaLayer {
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.lsn_range.start);
|
||||
let mut need_image = true;
|
||||
@@ -301,7 +303,7 @@ impl Layer for DeltaLayer {
|
||||
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load()?;
|
||||
let inner = self.load(ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
@@ -391,16 +393,16 @@ impl PersistentLayer for DeltaLayer {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
fn iter(&self) -> Result<LayerIter<'_>> {
|
||||
let inner = self.load().context("load delta layer")?;
|
||||
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
let inner = self.load(ctx).context("load delta layer")?;
|
||||
Ok(match DeltaValueIter::new(inner) {
|
||||
Ok(iter) => Box::new(iter),
|
||||
Err(err) => Box::new(std::iter::once(Err(err))),
|
||||
})
|
||||
}
|
||||
|
||||
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
|
||||
let inner = self.load()?;
|
||||
fn key_iter(&self, ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
let inner = self.load(ctx)?;
|
||||
Ok(Box::new(
|
||||
DeltaKeyIter::new(inner).context("Layer index is corrupted")?,
|
||||
))
|
||||
@@ -459,7 +461,7 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(&self) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
@@ -143,7 +144,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} ----",
|
||||
self.tenant_id, self.timeline_id, self.key_range.start, self.key_range.end, self.lsn
|
||||
@@ -153,7 +154,7 @@ impl Layer for ImageLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
let inner = self.load(ctx)?;
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
@@ -174,12 +175,13 @@ impl Layer for ImageLayer {
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
assert!(self.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load()?;
|
||||
let inner = self.load(ctx)?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
@@ -220,7 +222,7 @@ impl PersistentLayer for ImageLayer {
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
fn iter(&self) -> Result<LayerIter<'_>> {
|
||||
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
@@ -270,7 +272,7 @@ impl ImageLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(&self) -> Result<RwLockReadGuard<ImageLayerInner>> {
|
||||
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<ImageLayerInner>> {
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
//! its position in the file, is kept in memory, though.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
@@ -110,7 +111,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool) -> Result<()> {
|
||||
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_str = inner
|
||||
@@ -166,6 +167,7 @@ impl Layer for InMemoryLayer {
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! in remote storage.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
@@ -51,6 +52,7 @@ impl Layer for RemoteLayer {
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
bail!(
|
||||
"layer {} needs to be downloaded",
|
||||
@@ -63,7 +65,7 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.tenantid,
|
||||
@@ -111,11 +113,11 @@ impl PersistentLayer for RemoteLayer {
|
||||
None
|
||||
}
|
||||
|
||||
fn iter(&self) -> Result<LayerIter<'_>> {
|
||||
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
bail!("cannot iterate a remote layer");
|
||||
}
|
||||
|
||||
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
|
||||
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
bail!("cannot iterate a remote layer");
|
||||
}
|
||||
|
||||
|
||||
@@ -696,7 +696,7 @@ impl Timeline {
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size).await?;
|
||||
self.compact_level0(target_file_size, ctx).await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
// If `create_image_layers' or `compact_level0` scheduled any
|
||||
@@ -1767,6 +1767,7 @@ impl Timeline {
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
@@ -1792,6 +1793,7 @@ impl Timeline {
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
@@ -1825,6 +1827,7 @@ impl Timeline {
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
@@ -2463,6 +2466,7 @@ impl Timeline {
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<CompactLevel0Phase1Result> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
@@ -2522,8 +2526,9 @@ impl Timeline {
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter =
|
||||
itertools::process_results(deltas_to_compact.iter().map(|l| l.iter()), |iter_iter| {
|
||||
let all_values_iter = itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.iter(ctx)),
|
||||
|iter_iter| {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
@@ -2539,11 +2544,12 @@ impl Timeline {
|
||||
true
|
||||
}
|
||||
})
|
||||
})?;
|
||||
},
|
||||
)?;
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
let mut all_keys_iter = itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter()),
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| {
|
||||
iter_iter.kmerge_by(|a, b| {
|
||||
let (a_key, a_lsn, _) = a;
|
||||
@@ -2719,11 +2725,15 @@ impl Timeline {
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
|
||||
async fn compact_level0(
|
||||
&self,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = self.compact_level0_phase1(target_file_size).await?;
|
||||
} = self.compact_level0_phase1(target_file_size, ctx).await?;
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
|
||||
Reference in New Issue
Block a user