Compare commits

...

24 Commits

Author SHA1 Message Date
Christian Schwarz
f63410b0d1 WIP: make PageServerConf crate-private 2023-02-02 14:42:59 +01:00
Christian Schwarz
00dfb2702a replace Option<LayerAccessStatsReset>::None with LayerAccessStatsReset::NoReset 2023-02-02 14:32:53 +01:00
Christian Schwarz
d99ace3601 thanks to HistoryBuf, we can now derive Default for LayerAccessStatsInner 2023-02-02 14:22:37 +01:00
Christian Schwarz
6548b831ab use most recent main commit of history_buffer
The previous was an intermediate commit from a PR.
At the version, we didn't have subsequent intermediary commit
0a21a56ad7
and that one is kinda important for what we want to do here.
2023-02-02 14:22:37 +01:00
Christian Schwarz
c9474483e5 use u64 to represent millis since epoch 2023-02-02 14:22:37 +01:00
Christian Schwarz
72d35a3b32 use heapless::HistoryBuffer instead of VecDeque 2023-02-02 14:22:37 +01:00
Christian Schwarz
6b5253abb2 strip new_ prefix from LayerAccessStats constructors 2023-02-02 14:22:37 +01:00
Christian Schwarz
2e676fba96 LayerResidenceStats: docs + distinct constructors for created=false,true 2023-02-02 14:22:37 +01:00
Christian Schwarz
0850d28059 cleanup API + add flag to residence status to distinguish loading from creating 2023-02-02 14:22:37 +01:00
Christian Schwarz
8ce21eb7e3 carry stats across eviction & on-demand download 2023-02-02 14:22:37 +01:00
Christian Schwarz
63ad7c4461 review suggestion: use X.workspace in Cargo.toml 2023-02-01 14:59:41 +01:00
Christian Schwarz
ffa96d55c8 review suggestions: use Mutex, avoid reallocs in inner.last_access, custom Default impl 2023-02-01 14:40:55 +01:00
Christian Schwarz
0ed5f2858b maintain access stats for historic layers
This patch adds basic access statistics for historic layers
and exposes them in the management API's `LayerMapInfo`.

We record the accesses in the `{Delta,Image}Layer::load()` function
because it's the common path of
* page_service (`Timline::get_reconstruct_data()`)
* Compaction (`PersistentLayer::iter()` and `PersistentLayer::key_iter()`)

There is a killswitch in pageserver.toml to disable statistics gathering
to support before/after benchmarks. With the killswitch enabled, the
overhead of this feature is the function call and the atomic load.

Preceding PRs:
- #3504 add RequestContext plumbing for layer access stats

This is PR https://github.com/neondatabase/neon/pull/3496
2023-01-31 18:48:26 +01:00
Christian Schwarz
60f0385794 add RequestContext plumbing for layer access stats
In preparation for #3496 , this patch plumbs through RequestContext to
the data access methods of PersistentLayer.

Preceding patch: #3502

This is PR https://github.com/neondatabase/neon/pull/3496
2023-01-31 18:14:25 +01:00
Christian Schwarz
2925555b4d add parse_query_param(); use Cow<> where possible; move param parsing code to utils::http::request
This is PR https://github.com/neondatabase/neon/pull/3502
2023-01-31 17:30:41 +01:00
Christian Schwarz
e708c4e22c replace btreemap with vec
This is PR https://github.com/neondatabase/neon/pull/3496
2023-01-31 15:33:23 +01:00
Kirill Bulatov
f4039c65a9 (De)Serialize Lsn and Key in LayerInfo in human-readable format 2023-01-31 15:13:43 +02:00
Kirill Bulatov
eab1f375e6 Code review: Hold another lock during eviction 2023-01-31 14:32:57 +02:00
Kirill Bulatov
b654687b3f Code review: Unify layer deletion code 2023-01-31 14:17:55 +02:00
Kirill Bulatov
891a3df163 Produce better error messages 2023-01-31 14:17:55 +02:00
Kirill Bulatov
c2a1e01208 Code review: Move Key into pageserver model and use it in HTTP responses 2023-01-31 14:17:50 +02:00
Kirill Bulatov
d446befb51 Implement layer eviction and download 2023-01-30 15:31:13 +02:00
Kirill Bulatov
aae4382613 Implement the timeline info using regular API 2023-01-30 13:41:58 +02:00
Kirill Bulatov
f3096d85e0 Print in-memory layers info 2023-01-30 13:41:58 +02:00
39 changed files with 1159 additions and 353 deletions

101
Cargo.lock generated
View File

@@ -141,6 +141,15 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic-polyfill"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d299f547288d6db8d5c3a2916f7b2f66134b15b8c1ac1c4357dd3b8752af7bb2"
dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
@@ -986,6 +995,12 @@ dependencies = [
"itertools",
]
[[package]]
name = "critical-section"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@@ -1223,6 +1238,47 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "enum-map"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50c25992259941eb7e57b936157961b217a4fc8597829ddef0596d6c3cd86e1a"
dependencies = [
"enum-map-derive",
]
[[package]]
name = "enum-map-derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "enumset"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753"
dependencies = [
"enumset_derive",
]
[[package]]
name = "enumset_derive"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "env_logger"
version = "0.10.0"
@@ -1510,6 +1566,15 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hash32"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -1525,6 +1590,18 @@ dependencies = [
"ahash",
]
[[package]]
name = "heapless"
version = "0.8.0"
source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version",
"spin 0.9.4",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.4.0"
@@ -2320,9 +2397,12 @@ dependencies = [
"crc32c",
"criterion",
"crossbeam-utils",
"enum-map",
"enumset",
"fail",
"futures",
"git-version",
"heapless",
"hex",
"hex-literal",
"humantime",
@@ -2352,6 +2432,8 @@ dependencies = [
"serde_with",
"signal-hook",
"storage_broker",
"strum",
"strum_macros",
"svg_fmt",
"tempfile",
"tenant_size_model",
@@ -2376,6 +2458,7 @@ dependencies = [
"byteorder",
"bytes",
"const_format",
"enum-map",
"postgres_ffi",
"serde",
"serde_with",
@@ -2996,7 +3079,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@@ -3543,6 +3626,21 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -4337,6 +4435,7 @@ dependencies = [
"tokio-rustls",
"tracing",
"tracing-subscriber",
"url",
"workspace_hack",
]

View File

@@ -37,6 +37,8 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
fs2 = "0.4.3"
futures = "0.3"
@@ -44,6 +46,7 @@ futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
hex = "0.4"
hex-literal = "0.3"
hmac = "0.12.1"

View File

@@ -13,5 +13,6 @@ bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
workspace_hack.workspace = true

View File

@@ -1,6 +1,12 @@
use std::num::{NonZeroU64, NonZeroUsize};
use std::{
collections::HashMap,
fmt,
num::{NonZeroU64, NonZeroUsize},
ops::Range,
str::FromStr,
};
use byteorder::{BigEndian, ReadBytesExt};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, BE};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
@@ -9,7 +15,6 @@ use utils::{
};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
@@ -227,6 +232,94 @@ pub struct TimelineInfo {
pub state: TimelineState,
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerMapInfo {
pub in_memory_layers: Vec<InMemoryLayerInfo>,
pub historic_layers: Vec<HistoricLayerInfo>,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
#[repr(usize)]
pub enum LayerAccessKind {
GetValueReconstructData,
Iter,
KeyIter,
Dump,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStatFullDetails {
pub when_millis_since_epoch: u64,
pub task_kind: &'static str,
pub access_kind: LayerAccessKind,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum LayerResidenceStatus {
Resident {
timestamp_millis_since_epoch: u64,
created: bool,
},
Evicted {
timestamp_millis_since_epoch: u64,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerAccessStats {
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
pub task_kind_access_flag: Vec<&'static str>,
pub first: Option<LayerAccessStatFullDetails>,
pub most_recent: Vec<LayerAccessStatFullDetails>,
pub most_recent_residence_changes: Vec<LayerResidenceStatus>,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[serde_as]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
}
#[derive(Debug, Clone, Serialize)]
#[serde_as]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
Delta {
layer_file_name: String,
#[serde_as(as = "DisplayFromStr")]
key_start: Key,
#[serde_as(as = "DisplayFromStr")]
key_end: Key,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
Image {
layer_file_name: String,
#[serde_as(as = "DisplayFromStr")]
key_start: Key,
#[serde_as(as = "DisplayFromStr")]
key_end: Key,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,
@@ -430,7 +523,7 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
_ => anyhow::bail!("unknown smgr message tag: {msg_tag:?}"),
}
}
}
@@ -470,6 +563,175 @@ impl PagestreamBeMessage {
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
let end = key_range.end;
if end.field1 != start.field1
|| end.field2 != start.field2
|| end.field3 != start.field3
|| end.field4 != start.field4
{
return u32::MAX;
}
let start = (start.field5 as u64) << 32 | start.field6 as u64;
let end = (end.field5 as u64) << 32 | end.field6 as u64;
let diff = end - start;
if diff > u32::MAX as u64 {
u32::MAX
} else {
diff as u32
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_hex(s)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> anyhow::Result<Self> {
if s.len() != 36 {
anyhow::bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
#[cfg(test)]
mod tests {
use bytes::Buf;

View File

@@ -37,6 +37,7 @@ metrics.workspace = true
pq_proto.workspace = true
workspace_hack.workspace = true
url.workspace = true
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,4 +1,5 @@
use std::str::FromStr;
use core::fmt;
use std::{borrow::Cow, str::FromStr};
use super::error::ApiError;
use anyhow::anyhow;
@@ -29,6 +30,50 @@ pub fn parse_request_param<T: FromStr>(
}
}
fn get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Option<Cow<'a, str>>, ApiError> {
let query = match request.uri().query() {
Some(q) => q,
None => return Ok(None),
};
let mut values = url::form_urlencoded::parse(query.as_bytes())
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
.fuse();
let value1 = values.next();
if values.next().is_some() {
return Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
)));
}
Ok(value1)
}
pub fn must_get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Cow<'a, str>, ApiError> {
get_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}
pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<Option<T>, ApiError> {
get_query_param(request, param_name)?
.map(|v| {
v.parse().map_err(|e| {
ApiError::BadRequest(anyhow!("cannot parse query param {param_name}: {e}"))
})
})
.transpose()
}
pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),

View File

@@ -67,6 +67,11 @@ utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true
rpds.workspace = true
enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
heapless.workspace = true
[dev-dependencies]
criterion.workspace = true

View File

@@ -1,9 +1,10 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::Layer;
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, LayerDescriptor};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use pageserver_api::models::Key;
use rand::prelude::{SeedableRng, StdRng};
use rand::seq::SliceRandom;
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};

View File

@@ -12,10 +12,10 @@ use std::sync::{Arc, Barrier};
use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf,
repository::Key,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, WalRedoError},
};
use pageserver_api::models::Key;
use utils::{id::TenantId, lsn::Lsn};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

View File

@@ -22,7 +22,7 @@
//! TODO Consider shipping this as a grafana panel plugin:
//! https://grafana.com/tutorials/build-a-panel-plugin/
use anyhow::Result;
use pageserver::repository::Key;
use pageserver_api::models::Key;
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::path::PathBuf;

View File

@@ -1,12 +1,13 @@
//! Main entry point for the Page Server executable.
use std::env::{var, VarError};
use std::sync::Arc;
use std::sync::{atomic, Arc};
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use pageserver::tenant::storage_layer;
use remote_storage::GenericRemoteStorage;
use tracing::*;

View File

@@ -12,7 +12,9 @@ use anyhow::Context;
use clap::{value_parser, Arg, Command};
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
@@ -75,7 +77,8 @@ fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
dump_layerfile_from_path(path, true)
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {

View File

@@ -13,7 +13,7 @@ static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
pub(crate) async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).

View File

@@ -106,7 +106,7 @@ pub mod defaults {
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageServerConf {
pub(crate) struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
pub id: NodeId,
@@ -160,6 +160,7 @@ pub struct PageServerConf {
pub test_remote_failures: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
pub layer_access_stats_disable: bool,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -226,6 +227,7 @@ struct PageServerConfigBuilder {
test_remote_failures: BuilderValue<u64>,
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
layer_access_stats_disable: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
@@ -273,6 +275,7 @@ impl Default for PageServerConfigBuilder {
test_remote_failures: Set(0),
ondemand_download_behavior_treat_error_as_warn: Set(false),
layer_access_stats_disable: Set(false),
}
}
}
@@ -377,6 +380,10 @@ impl PageServerConfigBuilder {
BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
}
pub fn layer_access_stats_disable(&mut self, val: bool) {
self.layer_access_stats_disable = BuilderValue::Set(val);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
Ok(PageServerConf {
listen_pg_addr: self
@@ -441,6 +448,9 @@ impl PageServerConfigBuilder {
.ok_or(anyhow!(
"missing ondemand_download_behavior_treat_error_as_warn"
))?,
layer_access_stats_disable: self
.layer_access_stats_disable
.ok_or(anyhow!("missing layer_acccess_stats_disable"))?,
})
}
}
@@ -620,6 +630,7 @@ impl PageServerConf {
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
"layer_access_stats_disable" => builder.layer_access_stats_disable(parse_toml_bool(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -745,6 +756,7 @@ impl PageServerConf {
synthetic_size_calculation_interval: Duration::from_secs(60),
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
layer_access_stats_disable: false,
}
}
}
@@ -934,6 +946,7 @@ log_format = 'json'
)?,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
layer_access_stats_disable: false,
},
"Correct defaults should be used when no config values are provided"
);
@@ -982,6 +995,7 @@ log_format = 'json'
synthetic_size_calculation_interval: Duration::from_secs(333),
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
layer_access_stats_disable: false,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -1,4 +1,4 @@
pub mod routes;
pub use routes::make_router;
pub(crate) use routes::make_router;
pub use pageserver_api::models;

View File

@@ -7,6 +7,7 @@ use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
@@ -17,6 +18,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::TenantMapInsertError;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
@@ -237,8 +239,8 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -251,13 +253,14 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info =
build_timeline_info(&timeline, include_non_incremental_logical_size, &ctx)
.await
.context(
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
)
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
}
@@ -269,36 +272,11 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, response_data)
}
/// Checks if a query param is present in the request's URL
fn query_param_present(request: &Request<Body>, param: &str) -> bool {
request
.uri()
.query()
.map(|v| url::form_urlencoded::parse(v.as_bytes()).any(|(p, _)| p == param))
.unwrap_or(false)
}
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v.into_owned()),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size =
query_param_present(&request, "include-non-incremental-logical-size");
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
// Logical size calculation needs downloading.
@@ -313,11 +291,14 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
.get_timeline(timeline_id, false)
.map_err(ApiError::NotFound)?;
let timeline_info =
build_timeline_info(&timeline, include_non_incremental_logical_size, &ctx)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
Ok::<_, ApiError>(timeline_info)
}
@@ -332,8 +313,8 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(&timestamp_raw)
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
@@ -505,13 +486,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let inputs_only = if query_param_present(&request, "inputs_only") {
get_query_param(&request, "inputs_only")?
.parse()
.map_err(|_| ApiError::BadRequest(anyhow!("failed to parse inputs_only")))?
} else {
false
};
let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true)
@@ -524,7 +499,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
.await
.map_err(ApiError::InternalServerError)?;
let size = if !inputs_only {
let size = if !inputs_only.unwrap_or(false) {
Some(inputs.calculate().map_err(ApiError::InternalServerError)?)
} else {
None
@@ -556,6 +531,82 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
)
}
async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let reset: LayerAccessStatsReset =
parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let layer_map_info = timeline.layer_map_info(reset);
json_response(StatusCode::OK, layer_map_info)
}
async fn layer_download_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_file_name = get_request_param(&request, "layer_file_name")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let downloaded = timeline
.download_layer(layer_file_name)
.await
.map_err(ApiError::InternalServerError)?;
match downloaded {
Some(true) => json_response(StatusCode::OK, ()),
Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
None => json_response(
StatusCode::BAD_REQUEST,
format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
),
}
}
async fn evict_timeline_layer_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_file_name = get_request_param(&request, "layer_file_name")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let evicted = timeline
.evict_layer(layer_file_name)
.await
.map_err(ApiError::InternalServerError)?;
match evicted {
Some(true) => json_response(StatusCode::OK, ()),
Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
None => json_response(
StatusCode::BAD_REQUEST,
format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
),
}
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -897,7 +948,7 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
)
}
pub fn make_router(
pub(crate) fn make_router(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
@@ -986,5 +1037,17 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer",
layer_map_info_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
layer_download_handler,
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
evict_timeline_layer_handler,
)
.any(handler_404))
}

View File

@@ -1,4 +1,4 @@
use crate::repository::{key_range_size, singleton_range, Key};
use pageserver_api::models::{key_range_size, Key};
use postgres_ffi::BLCKSZ;
use std::ops::Range;
@@ -102,7 +102,7 @@ impl KeySpaceAccum {
}
pub fn add_key(&mut self, key: Key) {
self.add_range(singleton_range(key))
self.add_range(key..key.next())
}
pub fn add_range(&mut self, range: Range<Key>) {

View File

@@ -47,13 +47,13 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use pageserver_api::models::Key;
use tracing::error;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::repository::Key;
use crate::tenant::writeback_ephemeral_file;
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();

View File

@@ -119,7 +119,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
///
/// Listens for connections, and launches a new handler task for each.
///
pub async fn libpq_listener_main(
pub(crate) async fn libpq_listener_main(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,

View File

@@ -13,7 +13,10 @@ use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::{
models::Key,
reltag::{RelTag, SlruKind},
};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};

View File

@@ -1,177 +1,9 @@
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::ops::AddAssign;
use std::time::Duration;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
let end = key_range.end;
if end.field1 != start.field1
|| end.field2 != start.field2
|| end.field3 != start.field3
|| end.field4 != start.field4
{
return u32::MAX;
}
let start = (start.field5 as u64) << 32 | start.field6 as u64;
let end = (end.field5 as u64) << 32 | end.field6 as u64;
let diff = end - start;
if diff > u32::MAX as u64 {
u32::MAX
} else {
diff as u32
}
}
pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Value {

View File

@@ -169,7 +169,14 @@ task_local! {
/// Note that we don't try to limit how many task of a certain kind can be running
/// at the same time.
///
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(
Debug,
// NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
enumset::EnumSetType,
serde::Serialize,
serde::Deserialize,
strum_macros::IntoStaticStr,
)]
pub enum TaskKind {
// Pageserver startup, i.e., `main`
Startup,
@@ -255,6 +262,8 @@ pub enum TaskKind {
// A request that comes in via the pageserver HTTP API.
MgmtRequest,
DebugTool,
#[cfg(test)]
UnitTest,
}

View File

@@ -102,7 +102,7 @@ pub use timeline::{PageReconstructError, Timeline};
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
// re-export for use in storage_sync.rs
pub use crate::tenant::metadata::save_metadata;
pub(crate) use crate::tenant::metadata::save_metadata;
// re-export for use in walreceiver
pub use crate::tenant::timeline::WalReceiverInfo;
@@ -117,7 +117,7 @@ pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
///
pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
pub(crate) conf: &'static PageServerConf,
state: watch::Sender<TenantState>,
@@ -571,7 +571,7 @@ impl Tenant {
/// finishes. You can use wait_until_active() to wait for the task to
/// complete.
///
pub fn spawn_attach(
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
remote_storage: GenericRemoteStorage,
@@ -811,7 +811,7 @@ impl Tenant {
}
/// Create a placeholder Tenant object for a broken tenant
pub fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
pub(crate) fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
Arc::new(Tenant::new(
TenantState::Broken,
@@ -835,7 +835,7 @@ impl Tenant {
/// state.
///
#[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))]
pub fn spawn_load(
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
@@ -2650,7 +2650,11 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
pub fn dump_layerfile_from_path(
path: &Path,
verbose: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use std::os::unix::fs::FileExt;
// All layer files start with a two-byte "magic" value, to identify the kind of
@@ -2660,8 +2664,8 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()
file.read_exact_at(&mut header_buf, 0)?;
match u16::from_be_bytes(header_buf) {
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose)?,
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose, ctx)?,
magic => bail!("unrecognized magic identifier: {:?}", magic),
}
@@ -2686,6 +2690,7 @@ pub mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use pageserver_api::models::Key;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use utils::logging;
@@ -2693,7 +2698,6 @@ pub mod harness {
use crate::{
config::PageServerConf,
repository::Key,
tenant::Tenant,
walrecord::NeonWalRecord,
walredo::{WalRedoError, WalRedoManager},
@@ -2882,13 +2886,14 @@ pub mod harness {
mod tests {
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
use crate::repository::Value;
use crate::tenant::harness::*;
use crate::DEFAULT_PG_VERSION;
use crate::METADATA_FILE_NAME;
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
use pageserver_api::models::Key;
use rand::{thread_rng, Rng};
static TEST_KEY: Lazy<Key> =

View File

@@ -47,7 +47,7 @@ pub struct EphemeralFile {
}
impl EphemeralFile {
pub fn create(
pub(crate) fn create(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,

View File

@@ -46,12 +46,13 @@
mod historic_layer_coverage;
mod layer_coverage;
use crate::context::RequestContext;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use pageserver_api::models::Key;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -654,22 +655,22 @@ where
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub fn dump(&self, verbose: bool) -> Result<()> {
pub fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!("Begin dump LayerMap");
println!("open_layer:");
if let Some(open_layer) = &self.open_layer {
open_layer.dump(verbose)?;
open_layer.dump(verbose, ctx)?;
}
println!("frozen_layers:");
for frozen_layer in self.frozen_layers.iter() {
frozen_layer.dump(verbose)?;
frozen_layer.dump(verbose, ctx)?;
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose)?;
layer.dump(verbose, ctx)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -229,7 +229,7 @@ impl TimelineMetadata {
}
/// Save timeline metadata to file
pub fn save_metadata(
pub(crate) fn save_metadata(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -264,7 +264,7 @@ pub fn save_metadata(
Ok(())
}
pub fn load_metadata(
pub(crate) fn load_metadata(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,

View File

@@ -59,7 +59,7 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip(conf, remote_storage))]
pub async fn init_tenant_mgr(
pub(crate) async fn init_tenant_mgr(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
@@ -147,7 +147,7 @@ pub async fn init_tenant_mgr(
Ok(())
}
pub fn schedule_local_tenant_processing(
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
remote_storage: Option<GenericRemoteStorage>,
@@ -261,7 +261,7 @@ pub async fn shutdown_all_tenants() {
}
}
pub async fn create_tenant(
pub(crate) async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -285,7 +285,7 @@ pub async fn create_tenant(
}).await
}
pub async fn update_tenant_config(
pub(crate) async fn update_tenant_config(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -331,7 +331,7 @@ pub async fn delete_timeline(
Ok(())
}
pub async fn detach_tenant(
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<()> {
@@ -347,7 +347,7 @@ pub async fn detach_tenant(
.await
}
pub async fn load_tenant(
pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
@@ -371,7 +371,7 @@ pub async fn load_tenant(
}).await
}
pub async fn ignore_tenant(
pub(crate) async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<()> {
@@ -414,7 +414,7 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
///
/// Downloading all the tenant data is performed in the background, this merely
/// spawns the background task and returns quickly.
pub async fn attach_tenant(
pub(crate) async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
remote_storage: GenericRemoteStorage,

View File

@@ -205,7 +205,7 @@ pub mod index;
mod upload;
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -293,7 +293,7 @@ impl RemoteTimelineClient {
/// Note: the caller must initialize the upload queue before any uploads can be scheduled,
/// by calling init_upload_queue.
///
pub fn new(
pub(crate) fn new(
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
tenant_id: TenantId,

View File

@@ -31,7 +31,7 @@ async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Er
/// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
///
/// Returns the size of the downloaded file.
pub async fn download_layer_file<'a>(
pub(crate) async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: TenantId,
@@ -170,7 +170,7 @@ pub fn is_temp_download_file(path: &Path) -> bool {
}
/// List timelines of given tenant in remote storage
pub async fn list_remote_timelines<'a>(
pub(crate) async fn list_remote_timelines<'a>(
storage: &'a GenericRemoteStorage,
conf: &'static PageServerConf,
tenant_id: TenantId,

View File

@@ -6,13 +6,20 @@ mod image_layer;
mod inmemory_layer;
mod remote_layer;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use crate::{context::RequestContext, repository::Value};
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use heapless::HistoryBuffer;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{HistoricLayerInfo, Key};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use utils::{
id::{TenantId, TimelineId},
@@ -20,7 +27,7 @@ use utils::{
};
pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName, PathOrConf};
pub(crate) use filename::{DeltaFileName, ImageFileName, LayerFileName, PathOrConf};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;
@@ -80,6 +87,201 @@ pub enum ValueReconstructResult {
Missing,
}
#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
#[derive(Debug, Default, Clone)]
struct LayerAccessStatsInner {
first_access: Option<LayerAccessStatFullDetails>,
count_by_access_kind: EnumMap<LayerAccessKind, u64>,
task_kind_flag: EnumSet<TaskKind>,
last_accesses: HistoryBuffer<LayerAccessStatFullDetails, 16>,
last_residence_changes: HistoryBuffer<LayerResidenceStatus, 16>,
}
#[derive(Debug, Clone)]
struct LayerAccessStatFullDetails {
when: SystemTime,
task_kind: TaskKind,
access_kind: LayerAccessKind,
}
#[derive(Debug, Clone)]
pub enum LayerResidenceStatus {
Resident {
timestamp: SystemTime,
/// If `true`, then this resident status marks the birth time of the layer.
created: bool,
},
Evicted {
timestamp: SystemTime,
},
}
#[derive(Clone, Copy, strum_macros::EnumString)]
pub enum LayerAccessStatsReset {
NoReset,
JustTaskKindFlags,
AllStats,
}
fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
ts.duration_since(UNIX_EPOCH)
.expect("better to die in this unlikely case than report false stats")
.as_millis()
.try_into()
.expect("64 bits is enough for few more years")
}
impl LayerAccessStatFullDetails {
fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
let Self {
when,
task_kind,
access_kind,
} = self;
pageserver_api::models::LayerAccessStatFullDetails {
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
task_kind: task_kind.into(), // into static str, powered by strum_macros
access_kind: *access_kind,
}
}
}
impl LayerResidenceStatus {
/// Residence status for a layer file that only exists on the remote.
pub fn evicted() -> Self {
LayerResidenceStatus::Evicted {
timestamp: SystemTime::now(),
}
}
/// Residence status for a layer file that exists locally.
/// It may also exist on the remote, we don't care here.
/// NB: use this for existing layer files, e.g., during timeline load.
/// For newly written layer files, use [`created`].
pub fn resident() -> Self {
LayerResidenceStatus::Resident {
timestamp: SystemTime::now(),
created: false,
}
}
/// Residence status for a local layer file that we just wrote.
/// Example: a layer file created by compaction.
/// Private, because callers are supposed to use [`LayerAccessStats::new_for_new_layer_file`].
fn created() -> Self {
LayerResidenceStatus::Resident {
timestamp: SystemTime::now(),
created: true,
}
}
fn to_api_model(&self) -> pageserver_api::models::LayerResidenceStatus {
match self {
LayerResidenceStatus::Resident { timestamp, created } => {
pageserver_api::models::LayerResidenceStatus::Resident {
timestamp_millis_since_epoch: system_time_to_millis_since_epoch(timestamp),
created: *created,
}
}
LayerResidenceStatus::Evicted { timestamp } => {
pageserver_api::models::LayerResidenceStatus::Evicted {
timestamp_millis_since_epoch: system_time_to_millis_since_epoch(timestamp),
}
}
}
}
}
impl LayerAccessStats {
pub(crate) fn for_loading_layer(residence_status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
new.record_residence_change(residence_status);
new
}
pub(crate) fn for_new_layer_file() -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
new.record_residence_change(LayerResidenceStatus::created());
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
/// The `new_status` is not recorded in `self`
pub(crate) fn clone_for_residence_change(
&self,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_change(new_status);
new
}
fn record_residence_change(&self, new_status: LayerResidenceStatus) {
let mut inner = self.0.lock().unwrap();
inner.last_residence_changes.write(new_status);
}
fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
let mut inner = self.0.lock().unwrap();
let this_access = LayerAccessStatFullDetails {
when: SystemTime::now(),
task_kind,
access_kind,
};
inner
.first_access
.get_or_insert_with(|| this_access.clone());
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);
}
fn to_api_model(
&self,
reset: LayerAccessStatsReset,
) -> pageserver_api::models::LayerAccessStats {
let mut inner = self.0.lock().unwrap();
let LayerAccessStatsInner {
first_access,
count_by_access_kind,
task_kind_flag,
last_accesses,
last_residence_changes,
} = &*inner;
let ret = pageserver_api::models::LayerAccessStats {
access_count_by_access_kind: count_by_access_kind
.iter()
.map(|(kind, count)| (kind, *count))
.collect(),
task_kind_access_flag: task_kind_flag
.iter()
.map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
.collect(),
first: first_access.as_ref().map(|a| a.to_api_model()),
most_recent: last_accesses.iter().map(|a| a.to_api_model()).collect(),
most_recent_residence_changes: last_residence_changes
.iter()
.map(|s| s.to_api_model())
.collect(),
};
match reset {
LayerAccessStatsReset::NoReset => (),
LayerAccessStatsReset::JustTaskKindFlags => {
inner.task_kind_flag.clear();
}
LayerAccessStatsReset::AllStats => {
*inner = LayerAccessStatsInner::default();
}
}
ret
}
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`].
pub trait Layer: Send + Sync {
@@ -117,13 +319,14 @@ pub trait Layer: Send + Sync {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
/// Dump summary of the contents of the layer to stdout
fn dump(&self, verbose: bool) -> Result<()>;
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
}
/// Returned by [`Layer::iter`]
@@ -161,11 +364,11 @@ pub trait PersistentLayer: Layer {
fn local_path(&self) -> Option<PathBuf>;
/// Iterate through all keys and values stored in the layer
fn iter(&self) -> Result<LayerIter<'_>>;
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>>;
/// Iterate through all keys stored in the layer. Returns key, lsn and value size
/// It is used only for compaction and so is currently implemented only for DeltaLayer
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
panic!("Not implemented")
}
@@ -185,6 +388,10 @@ pub trait PersistentLayer: Layer {
/// Should not change over the lifetime of the layer object because
/// current_physical_size is computed as the som of this value.
fn file_size(&self) -> Option<u64>;
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
@@ -231,6 +438,7 @@ impl Layer for LayerDescriptor {
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
@@ -239,7 +447,7 @@ impl Layer for LayerDescriptor {
self.short_id.clone()
}
fn dump(&self, _verbose: bool) -> Result<()> {
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}

View File

@@ -24,8 +24,9 @@
//! "values" part.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::repository::Value;
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
@@ -36,6 +37,7 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use pageserver_api::models::{HistoricLayerInfo, Key, LayerAccessKind, KEY_SIZE};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -53,7 +55,10 @@ use utils::{
lsn::Lsn,
};
use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOrConf};
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, LayerResidenceStatus, PathOrConf,
};
///
/// Header stored in the beginning of the file
@@ -183,6 +188,8 @@ pub struct DeltaLayer {
pub file_size: u64,
access_stats: LayerAccessStats,
inner: RwLock<DeltaLayerInner>,
}
@@ -214,7 +221,7 @@ impl Layer for DeltaLayer {
self.filename().file_name()
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool) -> Result<()> {
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenant_id,
@@ -229,7 +236,7 @@ impl Layer for DeltaLayer {
return Ok(());
}
let inner = self.load()?;
let inner = self.load(LayerAccessKind::Dump, ctx)?;
println!(
"index_start_blk: {}, root {}",
@@ -293,6 +300,7 @@ impl Layer for DeltaLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.lsn_range.start);
let mut need_image = true;
@@ -301,7 +309,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -391,16 +399,18 @@ impl PersistentLayer for DeltaLayer {
Some(self.path())
}
fn iter(&self) -> Result<LayerIter<'_>> {
let inner = self.load().context("load delta layer")?;
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer")?;
Ok(match DeltaValueIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(err) => Box::new(std::iter::once(Err(err))),
})
}
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
let inner = self.load()?;
fn key_iter(&self, ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
let inner = self.load(LayerAccessKind::KeyIter, ctx)?;
Ok(Box::new(
DeltaKeyIter::new(inner).context("Layer index is corrupted")?,
))
@@ -415,6 +425,28 @@ impl PersistentLayer for DeltaLayer {
fn file_size(&self) -> Option<u64> {
Some(self.file_size)
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let key_range = self.get_key_range();
let lsn_range = self.get_lsn_range();
let access_stats = self.access_stats.to_api_model(reset);
HistoricLayerInfo::Delta {
layer_file_name,
key_start: key_range.start,
key_end: key_range.end,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
access_stats,
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl DeltaLayer {
@@ -459,7 +491,13 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self) -> Result<RwLockReadGuard<DeltaLayerInner>> {
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
@@ -534,12 +572,13 @@ impl DeltaLayer {
}
/// Create a DeltaLayer struct representing an existing file on disk.
pub fn new(
pub(crate) fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &DeltaFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -548,6 +587,7 @@ impl DeltaLayer {
key_range: filename.key_range.clone(),
lsn_range: filename.lsn_range.clone(),
file_size,
access_stats,
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -577,6 +617,7 @@ impl DeltaLayer {
key_range: summary.key_range,
lsn_range: summary.lsn_range,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::resident()),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -747,6 +788,7 @@ impl DeltaLayerWriterInner {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
@@ -808,7 +850,7 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
pub fn new(
pub(crate) fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,

View File

@@ -2,13 +2,13 @@
//! Helper functions for dealing with filenames of the image and delta layer files.
//!
use crate::config::PageServerConf;
use crate::repository::Key;
use std::cmp::Ordering;
use std::fmt;
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use pageserver_api::models::Key;
use utils::lsn::Lsn;
// Note: Timeline::load_layer_map() relies on this sort order
@@ -279,7 +279,7 @@ impl<'de> serde::de::Visitor<'de> for LayerFileNameVisitor {
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
pub enum PathOrConf {
pub(crate) enum PathOrConf {
Path(PathBuf),
Conf(&'static PageServerConf),
}

View File

@@ -20,8 +20,8 @@
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
@@ -33,6 +33,7 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
use pageserver_api::models::{HistoricLayerInfo, Key, LayerAccessKind, KEY_SIZE};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -51,7 +52,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName, PathOrConf};
use super::{Layer, LayerIter};
use super::{Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerResidenceStatus};
///
/// Header stored in the beginning of the file
@@ -110,6 +111,8 @@ pub struct ImageLayer {
// This entry contains an image of all pages as of this LSN
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: RwLock<ImageLayerInner>,
}
@@ -143,7 +146,7 @@ impl Layer for ImageLayer {
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool) -> Result<()> {
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} ----",
self.tenant_id, self.timeline_id, self.key_range.start, self.key_range.end, self.lsn
@@ -153,7 +156,7 @@ impl Layer for ImageLayer {
return Ok(());
}
let inner = self.load()?;
let inner = self.load(LayerAccessKind::Dump, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -174,12 +177,13 @@ impl Layer for ImageLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load()?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -220,7 +224,7 @@ impl PersistentLayer for ImageLayer {
fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
fn iter(&self) -> Result<LayerIter<'_>> {
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!();
}
@@ -233,6 +237,25 @@ impl PersistentLayer for ImageLayer {
fn file_size(&self) -> Option<u64> {
Some(self.file_size)
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let key_range = self.get_key_range();
let lsn_range = self.get_lsn_range();
HistoricLayerInfo::Image {
layer_file_name,
key_start: key_range.start,
key_end: key_range.end,
lsn_start: lsn_range.start,
remote: false,
access_stats: self.access_stats.to_api_model(reset),
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl ImageLayer {
@@ -270,7 +293,13 @@ impl ImageLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self) -> Result<RwLockReadGuard<ImageLayerInner>> {
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<ImageLayerInner>> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
@@ -344,12 +373,13 @@ impl ImageLayer {
}
/// Create an ImageLayer struct representing an existing file on disk
pub fn new(
pub(crate) fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &ImageFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> ImageLayer {
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -358,6 +388,7 @@ impl ImageLayer {
key_range: filename.key_range.clone(),
lsn: filename.lsn,
file_size,
access_stats,
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,
@@ -385,6 +416,7 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::resident()),
inner: RwLock::new(ImageLayerInner {
file: None,
loaded: false,
@@ -544,6 +576,7 @@ impl ImageLayerWriterInner {
key_range: self.key_range.clone(),
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::for_new_layer_file(),
inner: RwLock::new(ImageLayerInner {
loaded: false,
file: None,
@@ -606,7 +639,7 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
pub fn new(
pub(crate) fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,

View File

@@ -5,13 +5,15 @@
//! its position in the file, is kept in memory, though.
//!
use crate::config::PageServerConf;
use crate::repository::{Key, Value};
use crate::context::RequestContext;
use crate::repository::Value;
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use pageserver_api::models::{InMemoryLayerInfo, Key};
use std::cell::RefCell;
use std::collections::HashMap;
use tracing::*;
@@ -79,6 +81,16 @@ impl InMemoryLayer {
pub fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
pub fn info(&self) -> InMemoryLayerInfo {
let lsn_start = self.start_lsn;
let lsn_end = self.inner.read().unwrap().end_lsn;
match lsn_end {
Some(lsn_end) => InMemoryLayerInfo::Frozen { lsn_start, lsn_end },
None => InMemoryLayerInfo::Open { lsn_start },
}
}
}
impl Layer for InMemoryLayer {
@@ -108,9 +120,8 @@ impl Layer for InMemoryLayer {
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
format!("inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool) -> Result<()> {
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
let end_str = inner
@@ -166,6 +177,7 @@ impl Layer for InMemoryLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -222,7 +234,7 @@ impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
pub(crate) fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,

View File

@@ -2,10 +2,11 @@
//! in remote storage.
//!
use crate::config::PageServerConf;
use crate::repository::Key;
use crate::context::RequestContext;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::{HistoricLayerInfo, Key};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
@@ -17,7 +18,10 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
use super::image_layer::ImageLayer;
use super::{DeltaLayer, LayerIter, LayerKeyIter, PersistentLayer};
use super::{
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer,
};
#[derive(Debug)]
pub struct RemoteLayer {
@@ -34,6 +38,8 @@ pub struct RemoteLayer {
is_incremental: bool,
access_stats: LayerAccessStats,
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
}
@@ -51,6 +57,7 @@ impl Layer for RemoteLayer {
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
@@ -63,7 +70,7 @@ impl Layer for RemoteLayer {
}
/// debugging function to print out the contents of the layer
fn dump(&self, _verbose: bool) -> Result<()> {
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenantid,
@@ -111,11 +118,11 @@ impl PersistentLayer for RemoteLayer {
None
}
fn iter(&self) -> Result<LayerIter<'_>> {
fn iter(&self, _ctx: &RequestContext) -> Result<LayerIter<'_>> {
bail!("cannot iterate a remote layer");
}
fn key_iter(&self) -> Result<LayerKeyIter<'_>> {
fn key_iter(&self, _ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
bail!("cannot iterate a remote layer");
}
@@ -134,6 +141,37 @@ impl PersistentLayer for RemoteLayer {
fn file_size(&self) -> Option<u64> {
self.layer_metadata.file_size()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let key_range = self.get_key_range();
let lsn_range = self.get_lsn_range();
if self.is_delta {
HistoricLayerInfo::Delta {
layer_file_name,
key_start: key_range.start,
key_end: key_range.end,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: true,
access_stats: self.access_stats.to_api_model(reset),
}
} else {
HistoricLayerInfo::Image {
layer_file_name,
key_start: key_range.start,
key_end: key_range.end,
lsn_start: lsn_range.start,
remote: true,
access_stats: self.access_stats.to_api_model(reset),
}
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl RemoteLayer {
@@ -142,6 +180,7 @@ impl RemoteLayer {
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
tenantid,
@@ -153,6 +192,7 @@ impl RemoteLayer {
file_name: fname.to_owned().into(),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
access_stats,
}
}
@@ -161,6 +201,7 @@ impl RemoteLayer {
timelineid: TimelineId,
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
tenantid,
@@ -172,11 +213,12 @@ impl RemoteLayer {
file_name: fname.to_owned().into(),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
access_stats,
}
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
pub(crate) fn create_downloaded_layer(
&self,
conf: &'static PageServerConf,
file_size: u64,
@@ -192,6 +234,8 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::resident()),
))
} else {
let fname = ImageFileName {
@@ -204,6 +248,8 @@ impl RemoteLayer {
self.tenantid,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::resident()),
))
}
}

View File

@@ -10,7 +10,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, TimelineState,
DownloadRemoteLayersTaskState, Key, LayerMapInfo, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -29,8 +29,8 @@ use crate::broker_client::is_broker_client_initialized;
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
RemoteLayer,
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, LayerResidenceStatus, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -60,7 +60,7 @@ use utils::{
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::repository::Value;
use crate::task_mgr::TaskKind;
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
@@ -68,9 +68,10 @@ use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer};
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum FlushLoopState {
@@ -90,7 +91,7 @@ pub struct Timeline {
pub pg_version: u32,
pub layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -682,7 +683,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size).await?;
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();
// If `create_image_layers' or `compact_level0` scheduled any
@@ -818,6 +819,91 @@ impl Timeline {
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let mut in_memory_layers = Vec::new();
let layer_map = self.layers.read().unwrap();
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
}
for frozen_layer in &layer_map.frozen_layers {
in_memory_layers.push(frozen_layer.info());
}
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
historic_layers.push(historic_layer.info(reset));
}
LayerMapInfo {
in_memory_layers,
historic_layers,
}
}
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
let Some(remote_layer) = layer.downcast_remote_layer() else {return Ok(Some(false)) };
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_client) = &self.remote_client else { return Ok(Some(false)) };
if local_layer.is_remote_layer() {
return Ok(Some(false));
}
// ensure the current layer is uploaded for sure
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
let layer_metadata = LayerFileMetadata::new(
local_layer
.file_size()
.expect("Local layer should have a file size"),
);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::evicted()),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::evicted()),
),
#[cfg(test)]
LayerFileName::Test(_) => unreachable!(),
});
let mut _gc_lock = self.layer_removal_cs.lock().await;
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
self.delete_historic_layer(local_layer, &mut updates)?;
updates.insert_historic(new_remote_layer);
updates.flush();
drop(layers);
Ok(Some(true))
}
}
// Private functions
@@ -1079,6 +1165,7 @@ impl Timeline {
self.tenant_id,
&imgfilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::resident()),
);
trace!("found layer {}", layer.path().display());
@@ -1110,6 +1197,7 @@ impl Timeline {
self.tenant_id,
&deltafilename,
file_size,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::resident()),
);
trace!("found layer {}", layer.path().display());
@@ -1247,6 +1335,7 @@ impl Timeline {
self.timeline_id,
imgfilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::evicted()),
);
let remote_layer = Arc::new(remote_layer);
@@ -1271,6 +1360,7 @@ impl Timeline {
self.timeline_id,
deltafilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(LayerResidenceStatus::evicted()),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
@@ -1607,6 +1697,41 @@ impl Timeline {
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
}
}
None
}
/// Removes the layer from local FS (if present) and from memory.
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
&self,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
let layer_size = layer.file_size();
layer.delete()?;
if let Some(layer_size) = layer_size {
self.metrics.resident_physical_size_gauge.sub(layer_size);
}
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer);
Ok(())
}
}
type TraversalId = String;
@@ -1753,6 +1878,7 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -1778,6 +1904,7 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -1811,6 +1938,7 @@ impl Timeline {
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
@@ -2449,6 +2577,7 @@ impl Timeline {
async fn compact_level0_phase1(
&self,
target_file_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<CompactLevel0Phase1Result> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
@@ -2508,8 +2637,9 @@ impl Timeline {
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter =
itertools::process_results(deltas_to_compact.iter().map(|l| l.iter()), |iter_iter| {
let all_values_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
@@ -2525,11 +2655,12 @@ impl Timeline {
true
}
})
})?;
},
)?;
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter()),
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
@@ -2705,11 +2836,15 @@ impl Timeline {
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
async fn compact_level0(
&self,
target_file_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self.compact_level0_phase1(target_file_size).await?;
} = self.compact_level0_phase1(target_file_size, ctx).await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -2756,14 +2891,8 @@ impl Timeline {
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.metrics
.resident_physical_size_gauge
.sub(path.metadata()?.len());
}
layer_names_to_delete.push(l.filename());
l.delete()?;
updates.remove_historic(l);
self.delete_historic_layer(l, &mut updates)?;
}
updates.flush();
drop(layers);
@@ -3064,20 +3193,8 @@ impl Timeline {
// while iterating it. BTreeMap::retain() would be another option)
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
if let Some(path) = doomed_layer.local_path() {
self.metrics
.resident_physical_size_gauge
.sub(path.metadata()?.len());
}
layer_names_to_delete.push(doomed_layer.filename());
doomed_layer.delete()?; // FIXME: schedule succeeded deletions before returning?
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(doomed_layer);
self.delete_historic_layer(doomed_layer, &mut updates)?;
result.layers_removed += 1;
}

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::models::Key;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs::OpenOptions;
@@ -46,7 +47,6 @@ use crate::metrics::{
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
@@ -222,7 +222,7 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
pub(crate) fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_id,
@@ -1153,9 +1153,9 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
#[cfg(test)]
mod tests {
use super::{PostgresRedoManager, WalRedoManager};
use crate::repository::Key;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
use bytes::Bytes;
use pageserver_api::models::Key;
use std::str::FromStr;
use utils::{id::TenantId, lsn::Lsn};

View File

@@ -293,7 +293,7 @@ class NeonPageserverHttpClient(requests.Session):
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=true"
)
self.verbose_error(res)
res_json = res.json()

View File

@@ -1232,9 +1232,9 @@ class PageserverHttpClient(requests.Session):
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "yes"
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "yes"
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params
@@ -1276,9 +1276,9 @@ class PageserverHttpClient(requests.Session):
) -> Dict[Any, Any]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "yes"
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "yes"
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",