mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-02 11:40:37 +00:00
Compare commits
10 Commits
read_only_
...
layer_map_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c471c25744 | ||
|
|
e030830397 | ||
|
|
58fa4f0eb7 | ||
|
|
877a2d70e3 | ||
|
|
959f5c6f40 | ||
|
|
678fe0684f | ||
|
|
c9821f13e0 | ||
|
|
121d535068 | ||
|
|
ec3a3aed37 | ||
|
|
87cd2bae77 |
54
.github/helm-values/production.proxy-scram.yaml
vendored
54
.github/helm-values/production.proxy-scram.yaml
vendored
@@ -1,54 +0,0 @@
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-release.local/management/api/v2"
|
||||
domain: "*.cloud.neon.tech"
|
||||
sentryEnvironment: "production"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://console-release.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "10min"
|
||||
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: production
|
||||
zenith_region: us-west-2
|
||||
zenith_region_slug: oregon
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: '*.cloud.neon.tech'
|
||||
httpsPort: 443
|
||||
|
||||
metrics:
|
||||
enabled: true
|
||||
serviceMonitor:
|
||||
enabled: true
|
||||
selector:
|
||||
release: kube-prometheus-stack
|
||||
|
||||
extraManifests:
|
||||
- apiVersion: operator.victoriametrics.com/v1beta1
|
||||
kind: VMServiceScrape
|
||||
metadata:
|
||||
name: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
labels:
|
||||
helm.sh/chart: neon-proxy-{{ .Chart.Version }}
|
||||
app.kubernetes.io/name: neon-proxy
|
||||
app.kubernetes.io/instance: "{{ include \"neon-proxy.fullname\" . }}"
|
||||
app.kubernetes.io/version: "{{ .Chart.AppVersion }}"
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
namespace: "{{ .Release.Namespace }}"
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: "neon-proxy"
|
||||
endpoints:
|
||||
- port: http
|
||||
path: /metrics
|
||||
interval: 10s
|
||||
scrapeTimeout: 10s
|
||||
namespaceSelector:
|
||||
matchNames:
|
||||
- "{{ .Release.Namespace }}"
|
||||
35
.github/workflows/deploy-prod.yml
vendored
35
.github/workflows/deploy-prod.yml
vendored
@@ -204,41 +204,6 @@ jobs:
|
||||
- name: Cleanup ansible folder
|
||||
run: rm -rf ~/.ansible
|
||||
|
||||
deploy-proxy:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
if: inputs.deployProxy && inputs.disclamerAcknowledged
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
environment:
|
||||
name: prod-old
|
||||
env:
|
||||
KUBECONFIG: .kubeconfig
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
ref: ${{ inputs.branch }}
|
||||
|
||||
- name: Store kubeconfig file
|
||||
run: |
|
||||
echo "${{ secrets.PRODUCTION_KUBECONFIG_DATA }}" | base64 --decode > ${KUBECONFIG}
|
||||
chmod 0600 ${KUBECONFIG}
|
||||
|
||||
- name: Add neon helm chart
|
||||
run: helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{ inputs.dockerTag }}
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --install --atomic -f .github/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
|
||||
|
||||
- name: Cleanup helm folder
|
||||
run: rm -rf ~/.cache
|
||||
|
||||
deploy-storage-broker:
|
||||
name: deploy storage broker on old staging and old prod
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
|
||||
101
Cargo.lock
generated
101
Cargo.lock
generated
@@ -152,6 +152,15 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-polyfill"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d299f547288d6db8d5c3a2916f7b2f66134b15b8c1ac1c4357dd3b8752af7bb2"
|
||||
dependencies = [
|
||||
"critical-section",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
@@ -997,6 +1006,12 @@ dependencies = [
|
||||
"itertools",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "critical-section"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.6"
|
||||
@@ -1234,6 +1249,47 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-map"
|
||||
version = "2.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50c25992259941eb7e57b936157961b217a4fc8597829ddef0596d6c3cd86e1a"
|
||||
dependencies = [
|
||||
"enum-map-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-map-derive"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enumset"
|
||||
version = "1.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753"
|
||||
dependencies = [
|
||||
"enumset_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enumset_derive"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.10.0"
|
||||
@@ -1521,6 +1577,15 @@ version = "1.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
|
||||
|
||||
[[package]]
|
||||
name = "hash32"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
@@ -1548,6 +1613,18 @@ dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963be2de24804acf5e5001#644653bf3b831c6bb4963be2de24804acf5e5001"
|
||||
dependencies = [
|
||||
"atomic-polyfill",
|
||||
"hash32",
|
||||
"rustc_version",
|
||||
"spin 0.9.4",
|
||||
"stable_deref_trait",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.1"
|
||||
@@ -2019,6 +2096,7 @@ dependencies = [
|
||||
name = "metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"prometheus",
|
||||
@@ -2343,6 +2421,8 @@ dependencies = [
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"crossbeam-utils",
|
||||
"enum-map",
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
@@ -2375,6 +2455,8 @@ dependencies = [
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"storage_broker",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"svg_fmt",
|
||||
"tempfile",
|
||||
"tenant_size_model",
|
||||
@@ -2399,6 +2481,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
"postgres_ffi",
|
||||
"serde",
|
||||
"serde_with",
|
||||
@@ -3020,7 +3103,7 @@ dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"spin",
|
||||
"spin 0.5.2",
|
||||
"untrusted",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
@@ -3567,6 +3650,21 @@ version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.9.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -4346,6 +4444,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"criterion",
|
||||
"git-version",
|
||||
"heapless",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"hyper",
|
||||
|
||||
@@ -38,6 +38,8 @@ comfy-table = "6.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
crossbeam-utils = "0.8.5"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
fail = "0.5.0"
|
||||
fs2 = "0.4.3"
|
||||
futures = "0.3"
|
||||
@@ -120,6 +122,9 @@ postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
|
||||
|
||||
## Other git libraries
|
||||
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
|
||||
|
||||
## Local libraries
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
|
||||
@@ -8,5 +8,6 @@ license.workspace = true
|
||||
prometheus.workspace = true
|
||||
libc.workspace = true
|
||||
once_cell.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
34
libs/metrics/src/launch_timestamp.rs
Normal file
34
libs/metrics/src/launch_timestamp.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
//! A timestamp captured at process startup to identify restarts of the process, e.g., in logs and metrics.
|
||||
|
||||
use chrono::Utc;
|
||||
|
||||
use super::register_uint_gauge;
|
||||
use std::fmt::Display;
|
||||
|
||||
pub struct LaunchTimestamp(chrono::DateTime<Utc>);
|
||||
|
||||
impl LaunchTimestamp {
|
||||
pub fn generate() -> Self {
|
||||
LaunchTimestamp(Utc::now())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for LaunchTimestamp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_launch_timestamp_metric(launch_ts: &'static LaunchTimestamp) {
|
||||
let millis_since_epoch: u64 = launch_ts
|
||||
.0
|
||||
.timestamp_millis()
|
||||
.try_into()
|
||||
.expect("we're after the epoch, this should be positive");
|
||||
let metric = register_uint_gauge!(
|
||||
"libmetrics_launch_timestamp",
|
||||
"Timestamp (millis since epoch) at wich the process launched."
|
||||
)
|
||||
.unwrap();
|
||||
metric.set(millis_since_epoch);
|
||||
}
|
||||
@@ -20,6 +20,7 @@ pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
|
||||
pub use prometheus::{Encoder, TextEncoder};
|
||||
use prometheus::{Registry, Result};
|
||||
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
|
||||
@@ -34,6 +35,14 @@ macro_rules! register_uint_gauge_vec {
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! register_uint_gauge {
|
||||
($NAME:expr, $HELP:expr $(,)?) => {{
|
||||
let gauge = $crate::UIntGauge::new($NAME, $HELP).unwrap();
|
||||
$crate::register(Box::new(gauge.clone())).map(|_| gauge)
|
||||
}};
|
||||
}
|
||||
|
||||
/// Special internal registry, to collect metrics independently from the default registry.
|
||||
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
|
||||
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
|
||||
|
||||
@@ -13,5 +13,6 @@ bytes.workspace = true
|
||||
byteorder.workspace = true
|
||||
utils.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
enum-map.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use utils::{
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
@@ -137,7 +142,6 @@ pub struct TenantConfigRequest {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
@@ -233,6 +237,82 @@ pub struct LayerMapInfo {
|
||||
pub historic_layers: Vec<HistoricLayerInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
|
||||
#[repr(usize)]
|
||||
pub enum LayerAccessKind {
|
||||
GetValueReconstructData,
|
||||
Iter,
|
||||
KeyIter,
|
||||
Dump,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerAccessStatFullDetails {
|
||||
pub when_millis_since_epoch: u64,
|
||||
pub task_kind: &'static str,
|
||||
pub access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
/// An event that impacts the layer's residence status.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LayerResidenceEvent {
|
||||
/// The time when the event occurred.
|
||||
/// NB: this timestamp is captured while the residence status changes.
|
||||
/// So, it might be behind/ahead of the actual residence change by a short amount of time.
|
||||
///
|
||||
#[serde(rename = "timestamp_millis_since_epoch")]
|
||||
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
|
||||
timestamp: SystemTime,
|
||||
/// The new residence status of the layer.
|
||||
status: LayerResidenceStatus,
|
||||
/// The reason why we had to record this event.
|
||||
reason: LayerResidenceEventReason,
|
||||
}
|
||||
|
||||
/// The reason for recording a given [`ResidenceEvent`].
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum LayerResidenceEventReason {
|
||||
/// The layer map is being populated, e.g. during timeline load or attach.
|
||||
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
|
||||
/// We need to record such events because there is no persistent storage for the events.
|
||||
LayerLoad,
|
||||
/// We just created the layer (e.g., freeze_and_flush or compaction).
|
||||
/// Such layers are always [`LayerResidenceStatus::Resident`].
|
||||
LayerCreate,
|
||||
/// We on-demand downloaded or evicted the given layer.
|
||||
ResidenceChange,
|
||||
}
|
||||
|
||||
/// The residence status of the layer, after the given [`LayerResidenceEvent`].
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum LayerResidenceStatus {
|
||||
/// Residence status for a layer file that exists locally.
|
||||
/// It may also exist on the remote, we don't care here.
|
||||
Resident,
|
||||
/// Residence status for a layer file that only exists on the remote.
|
||||
Evicted,
|
||||
}
|
||||
|
||||
impl LayerResidenceEvent {
|
||||
pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
|
||||
Self {
|
||||
status,
|
||||
reason,
|
||||
timestamp: SystemTime::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct LayerAccessStats {
|
||||
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
|
||||
pub task_kind_access_flag: Vec<&'static str>,
|
||||
pub first: Option<LayerAccessStatFullDetails>,
|
||||
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
|
||||
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(tag = "kind")]
|
||||
@@ -262,6 +342,7 @@ pub enum HistoricLayerInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
lsn_end: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
},
|
||||
Image {
|
||||
layer_file_name: String,
|
||||
@@ -270,6 +351,7 @@ pub enum HistoricLayerInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
lsn_start: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ async-trait.workspace = true
|
||||
anyhow.workspace = true
|
||||
bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
heapless.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
161
libs/utils/src/history_buffer.rs
Normal file
161
libs/utils/src/history_buffer.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
//! A heapless buffer for events of sorts.
|
||||
|
||||
use std::ops;
|
||||
|
||||
use heapless::HistoryBuffer;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HistoryBufferWithDropCounter<T, const L: usize> {
|
||||
buffer: HistoryBuffer<T, L>,
|
||||
drop_count: u64,
|
||||
}
|
||||
|
||||
impl<T, const L: usize> HistoryBufferWithDropCounter<T, L> {
|
||||
pub fn write(&mut self, data: T) {
|
||||
let len_before = self.buffer.len();
|
||||
self.buffer.write(data);
|
||||
let len_after = self.buffer.len();
|
||||
self.drop_count += u64::from(len_before == len_after);
|
||||
}
|
||||
pub fn drop_count(&self) -> u64 {
|
||||
self.drop_count
|
||||
}
|
||||
pub fn map<U, F: Fn(&T) -> U>(&self, f: F) -> HistoryBufferWithDropCounter<U, L> {
|
||||
let mut buffer = HistoryBuffer::new();
|
||||
buffer.extend(self.buffer.oldest_ordered().map(f));
|
||||
HistoryBufferWithDropCounter::<U, L> {
|
||||
buffer,
|
||||
drop_count: self.drop_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> Default for HistoryBufferWithDropCounter<T, L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buffer: HistoryBuffer::default(),
|
||||
drop_count: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
|
||||
type Target = HistoryBuffer<T, L>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.buffer
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct SerdeRepr<T> {
|
||||
buffer: Vec<T>,
|
||||
drop_count: u64,
|
||||
}
|
||||
|
||||
impl<'a, T, const L: usize> From<&'a HistoryBufferWithDropCounter<T, L>> for SerdeRepr<T>
|
||||
where
|
||||
T: Clone + serde::Serialize,
|
||||
{
|
||||
fn from(value: &'a HistoryBufferWithDropCounter<T, L>) -> Self {
|
||||
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
|
||||
SerdeRepr {
|
||||
buffer: buffer.iter().cloned().collect(),
|
||||
drop_count: *drop_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const L: usize> serde::Serialize for HistoryBufferWithDropCounter<T, L>
|
||||
where
|
||||
T: Clone + serde::Serialize,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
SerdeRepr::from(self).serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::HistoryBufferWithDropCounter;
|
||||
|
||||
#[test]
|
||||
fn test_basics() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
b.write(1);
|
||||
b.write(2);
|
||||
b.write(3);
|
||||
assert!(b.iter().any(|e| *e == 2));
|
||||
assert!(b.iter().any(|e| *e == 3));
|
||||
assert!(!b.iter().any(|e| *e == 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_count_works() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
b.write(1);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
b.write(2);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
b.write(4);
|
||||
assert_eq!(b.drop_count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clone_works() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
b.write(1);
|
||||
b.write(2);
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
let mut c = b.clone();
|
||||
assert_eq!(c.drop_count(), 1);
|
||||
assert!(c.iter().any(|e| *e == 2));
|
||||
assert!(c.iter().any(|e| *e == 3));
|
||||
assert!(!c.iter().any(|e| *e == 1));
|
||||
|
||||
c.write(4);
|
||||
assert!(c.iter().any(|e| *e == 4));
|
||||
assert!(!b.iter().any(|e| *e == 4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_map() {
|
||||
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
|
||||
|
||||
b.write(1);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(c.oldest_ordered().cloned().collect::<Vec<_>>(), vec![11]);
|
||||
assert_eq!(c.drop_count(), 0);
|
||||
}
|
||||
|
||||
b.write(2);
|
||||
assert_eq!(b.drop_count(), 0);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(
|
||||
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
||||
vec![11, 12]
|
||||
);
|
||||
assert_eq!(c.drop_count(), 0);
|
||||
}
|
||||
|
||||
b.write(3);
|
||||
assert_eq!(b.drop_count(), 1);
|
||||
{
|
||||
let c = b.map(|i| i + 10);
|
||||
assert_eq!(
|
||||
c.oldest_ordered().cloned().collect::<Vec<_>>(),
|
||||
vec![12, 13]
|
||||
);
|
||||
assert_eq!(c.drop_count(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::auth::{Claims, JwtAuth};
|
||||
use crate::http::error;
|
||||
use anyhow::anyhow;
|
||||
use hyper::header::AUTHORIZATION;
|
||||
use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -13,6 +14,7 @@ use tracing::info;
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::TcpListener;
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::error::ApiError;
|
||||
|
||||
@@ -143,6 +145,38 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_response_header_middleware<B>(
|
||||
header: &str,
|
||||
value: &str,
|
||||
) -> anyhow::Result<Middleware<B, ApiError>>
|
||||
where
|
||||
B: hyper::body::HttpBody + Send + Sync + 'static,
|
||||
{
|
||||
let name =
|
||||
HeaderName::from_str(header).with_context(|| format!("invalid header name: {header}"))?;
|
||||
let value =
|
||||
HeaderValue::from_str(value).with_context(|| format!("invalid header value: {value}"))?;
|
||||
Ok(Middleware::post_with_info(
|
||||
move |mut response, request_info| {
|
||||
let name = name.clone();
|
||||
let value = value.clone();
|
||||
async move {
|
||||
let headers = response.headers_mut();
|
||||
if headers.contains_key(&name) {
|
||||
tracing::warn!(
|
||||
"{} response already contains header {:?}",
|
||||
request_info.uri(),
|
||||
&name,
|
||||
);
|
||||
} else {
|
||||
headers.insert(name, value);
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
pub fn check_permission_with(
|
||||
req: &Request<Body>,
|
||||
check_permission: impl Fn(&Claims) -> Result<(), anyhow::Error>,
|
||||
|
||||
@@ -52,6 +52,8 @@ pub mod signals;
|
||||
|
||||
pub mod fs_ext;
|
||||
|
||||
pub mod history_buffer;
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
#[macro_export]
|
||||
macro_rules! failpoint_sleep_millis_async {
|
||||
|
||||
@@ -67,6 +67,10 @@ utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{env, ops::ControlFlow, path::Path, str::FromStr};
|
||||
use anyhow::{anyhow, Context};
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
use fail::FailScenario;
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tracing::*;
|
||||
|
||||
@@ -52,6 +53,8 @@ fn version() -> String {
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
|
||||
let arg_matches = cli().get_matches();
|
||||
|
||||
if arg_matches.get_flag("enabled-features") {
|
||||
@@ -108,7 +111,7 @@ fn main() -> anyhow::Result<()> {
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -203,13 +206,24 @@ fn initialize_config(
|
||||
})
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<()> {
|
||||
// Initialize logging
|
||||
logging::init(conf.log_format)?;
|
||||
|
||||
// Print version to the log, and expose it as a prometheus metric too.
|
||||
info!("version: {}", version());
|
||||
// Print version and launch timestamp to the log,
|
||||
// and expose them as prometheus metrics.
|
||||
// A changed version string indicates changed software.
|
||||
// A changed launch timestamp indicates a pageserver restart.
|
||||
info!(
|
||||
"version: {} launch_timestamp: {}",
|
||||
version(),
|
||||
launch_ts.to_string()
|
||||
);
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
set_launch_timestamp_metric(launch_ts);
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
// print them to the log for debugging purposes
|
||||
@@ -307,7 +321,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
{
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
|
||||
|
||||
let router = http::make_router(conf, auth.clone(), remote_storage)?
|
||||
let router = http::make_router(conf, launch_ts, auth.clone(), remote_storage)?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
@@ -347,6 +361,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
metrics_ctx,
|
||||
|
||||
@@ -58,6 +58,7 @@ pub mod defaults {
|
||||
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
|
||||
|
||||
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
|
||||
@@ -85,6 +86,7 @@ pub mod defaults {
|
||||
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
|
||||
|
||||
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
|
||||
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
|
||||
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
|
||||
|
||||
# [tenant_config]
|
||||
@@ -154,6 +156,8 @@ pub struct PageServerConf {
|
||||
|
||||
// How often to collect metrics and send them to the metrics endpoint.
|
||||
pub metric_collection_interval: Duration,
|
||||
// How often to send unchanged cached metrics to the metrics endpoint.
|
||||
pub cached_metric_collection_interval: Duration,
|
||||
pub metric_collection_endpoint: Option<Url>,
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
|
||||
@@ -220,6 +224,7 @@ struct PageServerConfigBuilder {
|
||||
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
|
||||
|
||||
metric_collection_interval: BuilderValue<Duration>,
|
||||
cached_metric_collection_interval: BuilderValue<Duration>,
|
||||
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
||||
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
||||
|
||||
@@ -264,6 +269,10 @@ impl Default for PageServerConfigBuilder {
|
||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default metric collection interval")),
|
||||
cached_metric_collection_interval: Set(humantime::parse_duration(
|
||||
DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default cached_metric_collection_interval")),
|
||||
synthetic_size_calculation_interval: Set(humantime::parse_duration(
|
||||
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
|
||||
)
|
||||
@@ -353,6 +362,14 @@ impl PageServerConfigBuilder {
|
||||
self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
|
||||
}
|
||||
|
||||
pub fn cached_metric_collection_interval(
|
||||
&mut self,
|
||||
cached_metric_collection_interval: Duration,
|
||||
) {
|
||||
self.cached_metric_collection_interval =
|
||||
BuilderValue::Set(cached_metric_collection_interval)
|
||||
}
|
||||
|
||||
pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
|
||||
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
|
||||
}
|
||||
@@ -427,6 +444,9 @@ impl PageServerConfigBuilder {
|
||||
metric_collection_interval: self
|
||||
.metric_collection_interval
|
||||
.ok_or(anyhow!("missing metric_collection_interval"))?,
|
||||
cached_metric_collection_interval: self
|
||||
.cached_metric_collection_interval
|
||||
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
|
||||
metric_collection_endpoint: self
|
||||
.metric_collection_endpoint
|
||||
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
|
||||
@@ -612,6 +632,7 @@ impl PageServerConf {
|
||||
ConfigurableSemaphore::new(permits)
|
||||
}),
|
||||
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
|
||||
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
|
||||
"metric_collection_endpoint" => {
|
||||
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
|
||||
builder.metric_collection_endpoint(Some(endpoint));
|
||||
@@ -741,6 +762,7 @@ impl PageServerConf {
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
metric_collection_interval: Duration::from_secs(60),
|
||||
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||
test_remote_failures: 0,
|
||||
@@ -881,6 +903,7 @@ initial_superuser_name = 'zzzz'
|
||||
id = 10
|
||||
|
||||
metric_collection_interval = '222 s'
|
||||
cached_metric_collection_interval = '22200 s'
|
||||
metric_collection_endpoint = 'http://localhost:80/metrics'
|
||||
synthetic_size_calculation_interval = '333 s'
|
||||
log_format = 'json'
|
||||
@@ -928,6 +951,9 @@ log_format = 'json'
|
||||
metric_collection_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
|
||||
)?,
|
||||
cached_metric_collection_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
|
||||
)?,
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
synthetic_size_calculation_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
||||
@@ -978,6 +1004,7 @@ log_format = 'json'
|
||||
log_format: LogFormat::Json,
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
metric_collection_interval: Duration::from_secs(222),
|
||||
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||
test_remote_failures: 0,
|
||||
|
||||
@@ -46,12 +46,12 @@ pub struct PageserverConsumptionMetricsKey {
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
metric_collection_interval: Duration,
|
||||
cached_metric_collection_interval: Duration,
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
node_id: NodeId,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut ticker = tokio::time::interval(metric_collection_interval);
|
||||
|
||||
info!("starting collect_metrics");
|
||||
|
||||
// spin up background worker that caclulates tenant sizes
|
||||
@@ -75,6 +75,7 @@ pub async fn collect_metrics(
|
||||
// define client here to reuse it for all requests
|
||||
let client = reqwest::Client::new();
|
||||
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
|
||||
let mut prev_iteration_time: Option<std::time::Instant> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -83,7 +84,15 @@ pub async fn collect_metrics(
|
||||
return Ok(());
|
||||
},
|
||||
_ = ticker.tick() => {
|
||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
|
||||
|
||||
// send cached metrics every cached_metric_collection_interval
|
||||
let send_cached = prev_iteration_time
|
||||
.map(|x| x.elapsed() >= cached_metric_collection_interval)
|
||||
.unwrap_or(false);
|
||||
|
||||
prev_iteration_time = Some(std::time::Instant::now());
|
||||
|
||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -105,6 +114,7 @@ pub async fn collect_metrics_iteration(
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
node_id: NodeId,
|
||||
ctx: &RequestContext,
|
||||
send_cached: bool,
|
||||
) {
|
||||
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
||||
trace!(
|
||||
@@ -222,11 +232,14 @@ pub async fn collect_metrics_iteration(
|
||||
));
|
||||
}
|
||||
|
||||
// Filter metrics
|
||||
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
|
||||
Some(val) => val != curr_val,
|
||||
None => true,
|
||||
});
|
||||
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||
// See: https://github.com/neondatabase/neon/issues/3485
|
||||
if !send_cached {
|
||||
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
|
||||
Some(val) => val != curr_val,
|
||||
None => true,
|
||||
});
|
||||
}
|
||||
|
||||
if current_metrics.is_empty() {
|
||||
trace!("no new metrics to send");
|
||||
|
||||
@@ -664,6 +664,55 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
/v1/tenant/{tenant_id}/config/:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: |
|
||||
Returns tenant's config description: specific config overrides a tenant has
|
||||
and the effective config.
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant config, specific and effective
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantConfig"
|
||||
"400":
|
||||
description: Malformed get tenanant config request
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Tenand or timeline were not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
components:
|
||||
securitySchemes:
|
||||
JWT:
|
||||
@@ -724,10 +773,33 @@ components:
|
||||
type: integer
|
||||
checkpoint_timeout:
|
||||
type: string
|
||||
compaction_target_size:
|
||||
type: integer
|
||||
compaction_period:
|
||||
type: string
|
||||
compaction_threshold:
|
||||
type: string
|
||||
image_creation_threshold:
|
||||
type: integer
|
||||
walreceiver_connect_timeout:
|
||||
type: string
|
||||
lagging_wal_timeout:
|
||||
type: string
|
||||
max_lsn_wal_lag:
|
||||
type: integer
|
||||
trace_read_requests:
|
||||
type: boolean
|
||||
TenantConfig:
|
||||
type: object
|
||||
properties:
|
||||
tenant_specific_overrides:
|
||||
type: object
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantConfigInfo"
|
||||
effective_config:
|
||||
type: object
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantConfigInfo"
|
||||
TimelineInfo:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -18,6 +20,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::mgr::TenantMapInsertError;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use utils::{
|
||||
@@ -494,7 +497,11 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let size = if !inputs_only.unwrap_or(false) {
|
||||
Some(inputs.calculate().map_err(ApiError::InternalServerError)?)
|
||||
Some(
|
||||
tenant
|
||||
.calc_and_update_cached_synthetic_size(&inputs)
|
||||
.map_err(ApiError::InternalServerError)?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -528,10 +535,13 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let reset: LayerAccessStatsReset =
|
||||
parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
|
||||
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info();
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
@@ -697,12 +707,40 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
)
|
||||
}
|
||||
|
||||
async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
async fn get_tenant_config_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let tenant = mgr::get_tenant(tenant_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let response = HashMap::from([
|
||||
(
|
||||
"tenant_specific_overrides",
|
||||
serde_json::to_value(tenant.tenant_specific_overrides())
|
||||
.context("serializing tenant specific overrides")
|
||||
.map_err(ApiError::InternalServerError)?,
|
||||
),
|
||||
(
|
||||
"effective_config",
|
||||
serde_json::to_value(tenant.effective_config())
|
||||
.context("serializing effective config")
|
||||
.map_err(ApiError::InternalServerError)?,
|
||||
),
|
||||
]);
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn update_tenant_config_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TenantConfigRequest = json_request(&mut request).await?;
|
||||
let tenant_id = request_data.tenant_id;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let mut tenant_conf: TenantConfOpt = Default::default();
|
||||
let mut tenant_conf = TenantConfOpt::default();
|
||||
if let Some(gc_period) = request_data.gc_period {
|
||||
tenant_conf.gc_period = Some(
|
||||
humantime::parse_duration(&gc_period)
|
||||
@@ -737,12 +775,8 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.map_err(ApiError::BadRequest)?,
|
||||
);
|
||||
}
|
||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
|
||||
tenant_conf.trace_read_requests = request_data.trace_read_requests;
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
@@ -764,7 +798,7 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::update_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
.await
|
||||
// FIXME: `update_tenant_config` can fail because of both user and internal errors.
|
||||
@@ -921,6 +955,7 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
@@ -937,6 +972,14 @@ pub fn make_router(
|
||||
}))
|
||||
}
|
||||
|
||||
router = router.middleware(
|
||||
endpoint::add_response_header_middleware(
|
||||
"PAGESERVER_LAUNCH_TIMESTAMP",
|
||||
&launch_ts.to_string(),
|
||||
)
|
||||
.expect("construct launch timestamp header middleware"),
|
||||
);
|
||||
|
||||
macro_rules! testing_api {
|
||||
($handler_desc:literal, $handler:path $(,)?) => {{
|
||||
#[cfg(not(feature = "testing"))]
|
||||
@@ -969,7 +1012,8 @@ pub fn make_router(
|
||||
.post("/v1/tenant", tenant_create_handler)
|
||||
.get("/v1/tenant/:tenant_id", tenant_status)
|
||||
.get("/v1/tenant/:tenant_id/size", tenant_size_handler)
|
||||
.put("/v1/tenant/config", tenant_config_handler)
|
||||
.put("/v1/tenant/config", update_tenant_config_handler)
|
||||
.get("/v1/tenant/:tenant_id/config", get_tenant_config_handler)
|
||||
.get("/v1/tenant/:tenant_id/timeline", timeline_list_handler)
|
||||
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
|
||||
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
|
||||
|
||||
@@ -150,6 +150,15 @@ pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("Failed to register pageserver_tenant_states_count metric")
|
||||
});
|
||||
|
||||
pub static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_synthetic_size",
|
||||
"Synthetic size of each tenant",
|
||||
&["tenant_id"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_synthetic_size metric")
|
||||
});
|
||||
|
||||
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
|
||||
// or in testing they estimate how much we would upload if we did.
|
||||
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
@@ -593,6 +602,7 @@ impl Drop for TimelineMetrics {
|
||||
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
for state in TENANT_STATE_OPTIONS {
|
||||
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
|
||||
}
|
||||
|
||||
@@ -169,7 +169,14 @@ task_local! {
|
||||
/// Note that we don't try to limit how many task of a certain kind can be running
|
||||
/// at the same time.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(
|
||||
Debug,
|
||||
// NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
|
||||
enumset::EnumSetType,
|
||||
serde::Serialize,
|
||||
serde::Deserialize,
|
||||
strum_macros::IntoStaticStr,
|
||||
)]
|
||||
pub enum TaskKind {
|
||||
// Pageserver startup, i.e., `main`
|
||||
Startup,
|
||||
|
||||
@@ -45,13 +45,14 @@ use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir;
|
||||
use crate::is_uninit_mark;
|
||||
use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC};
|
||||
use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
@@ -1618,8 +1619,16 @@ fn tree_sort_timelines(
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Private functions
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
*self.tenant_conf.read().unwrap()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
self.tenant_specific_overrides()
|
||||
.merge(self.conf.default_tenant_conf)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
@@ -1690,8 +1699,8 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
}
|
||||
|
||||
fn create_timeline_data(
|
||||
@@ -2432,13 +2441,27 @@ impl Tenant {
|
||||
pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result<u64> {
|
||||
let inputs = self.gather_size_inputs(ctx).await?;
|
||||
|
||||
self.calc_and_update_cached_synthetic_size(&inputs)
|
||||
}
|
||||
|
||||
/// Calculate synthetic size , cache it and set metric value
|
||||
pub fn calc_and_update_cached_synthetic_size(
|
||||
&self,
|
||||
inputs: &size::ModelInputs,
|
||||
) -> anyhow::Result<u64> {
|
||||
let size = inputs.calculate()?;
|
||||
|
||||
self.cached_synthetic_tenant_size
|
||||
.store(size, Ordering::Relaxed);
|
||||
|
||||
TENANT_SYNTHETIC_SIZE_METRIC
|
||||
.get_metric_with_label_values(&[&self.tenant_id.to_string()])
|
||||
.unwrap()
|
||||
.set(size);
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
pub fn get_cached_synthetic_size(&self) -> u64 {
|
||||
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
@@ -51,6 +51,7 @@ pub struct TenantConf {
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
@@ -96,23 +97,61 @@ pub struct TenantConf {
|
||||
/// which parameters are set and which are not.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub compaction_target_size: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub compaction_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub compaction_threshold: Option<usize>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub gc_horizon: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub gc_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
@@ -225,3 +264,24 @@ impl Default for TenantConf {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn de_serializing_pageserver_config_omits_empty_values() {
|
||||
let small_conf = TenantConfOpt {
|
||||
gc_horizon: Some(42),
|
||||
..TenantConfOpt::default()
|
||||
};
|
||||
|
||||
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
|
||||
assert_eq!(toml_form, "gc_horizon = 42\n");
|
||||
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
|
||||
|
||||
let json_form = serde_json::to_string(&small_conf).unwrap();
|
||||
assert_eq!(json_form, "{\"gc_horizon\":42}");
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
@@ -61,6 +62,8 @@ use utils::lsn::Lsn;
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
|
||||
use self::historic_layer_coverage::LayerKey;
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
|
||||
///
|
||||
@@ -87,11 +90,18 @@ pub struct LayerMap<L: ?Sized> {
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// Index of the historic layers optimized for search
|
||||
historic: BufferedHistoricLayerCoverage<Arc<L>>,
|
||||
historic: BufferedHistoricLayerCoverage<LayerKey>,
|
||||
|
||||
/// All layers accessible by key. Useful for:
|
||||
/// 1. Iterating all layers
|
||||
/// 2. Dereferencing a self.historic search result
|
||||
/// 3. Replacing a layer with a remote/local version without
|
||||
/// rebuilding the self.historic index.
|
||||
mapping: HashMap<LayerKey, Arc<L>>,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<L>>,
|
||||
l0_delta_layers: HashMap<LayerKey, Arc<L>>,
|
||||
}
|
||||
|
||||
impl<L: ?Sized> Default for LayerMap<L> {
|
||||
@@ -100,8 +110,9 @@ impl<L: ?Sized> Default for LayerMap<L> {
|
||||
open_layer: None,
|
||||
next_open_layer_at: None,
|
||||
frozen_layers: VecDeque::default(),
|
||||
l0_delta_layers: Vec::default(),
|
||||
l0_delta_layers: HashMap::default(),
|
||||
historic: BufferedHistoricLayerCoverage::default(),
|
||||
mapping: HashMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -139,24 +150,6 @@ where
|
||||
self.layer_map.remove_historic_noflush(layer)
|
||||
}
|
||||
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
self.layer_map.replace_historic_noflush(expected, new)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
// more explicit that there is some work being done.
|
||||
/// Apply all updates
|
||||
@@ -228,33 +221,38 @@ where
|
||||
match (latest_delta, latest_image) {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let image = self.mapping.get(&image).unwrap();
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
layer: image.clone(),
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
(Some(delta), None) => {
|
||||
let delta = self.mapping.get(&delta).unwrap();
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
layer: delta.clone(),
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
(Some(delta), Some(image)) => {
|
||||
let image = self.mapping.get(&image).unwrap();
|
||||
let delta = self.mapping.get(&delta).unwrap();
|
||||
|
||||
let img_lsn = image.get_lsn_range().start;
|
||||
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
|
||||
let image_exact_match = img_lsn + 1 == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
layer: image.clone(),
|
||||
lsn_floor: img_lsn,
|
||||
})
|
||||
} else {
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
layer: delta.clone(),
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
@@ -273,13 +271,12 @@ where
|
||||
/// Helper function for BatchedUpdates::insert_historic
|
||||
///
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
Arc::clone(&layer),
|
||||
);
|
||||
let key = LayerKey::from(&*layer);
|
||||
self.historic.insert(key.clone(), key.clone());
|
||||
self.mapping.insert(key.clone(), layer.clone());
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
self.l0_delta_layers.insert(key, layer.clone());
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
@@ -291,27 +288,28 @@ where
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(&*layer));
|
||||
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||
self.historic.remove(key.clone());
|
||||
self.mapping.remove(&key.clone());
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
self.l0_delta_layers
|
||||
.retain(|other| !Self::compare_arced_layers(other, &layer));
|
||||
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
|
||||
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
|
||||
// vtable) pairs.
|
||||
assert_eq!(
|
||||
self.l0_delta_layers.len(),
|
||||
len_before - 1,
|
||||
"failed to locate removed historic layer from l0_delta_layers"
|
||||
);
|
||||
self.l0_delta_layers.remove(&key);
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
pub(self) fn replace_historic_noflush(
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<L>,
|
||||
@@ -332,29 +330,23 @@ where
|
||||
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
let l0_index = if expected_l0 {
|
||||
// find the index in case replace worked, we need to replace that as well
|
||||
Some(
|
||||
self.l0_delta_layers
|
||||
.iter()
|
||||
.position(|slot| Self::compare_arced_layers(slot, expected))
|
||||
.ok_or_else(|| anyhow::anyhow!("existing l0 delta layer was not found"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
if expected_l0 {
|
||||
match self.mapping.entry(key.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.insert(new.clone()),
|
||||
Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
|
||||
};
|
||||
};
|
||||
|
||||
let replaced = self.historic.replace(&key, new.clone(), |existing| {
|
||||
Self::compare_arced_layers(existing, expected)
|
||||
});
|
||||
match self.mapping.entry(key.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.insert(new.clone()),
|
||||
Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
|
||||
};
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(replaced)
|
||||
Ok(Replacement::Replaced {
|
||||
in_buffered: false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
@@ -382,8 +374,8 @@ where
|
||||
let start = key.start.to_i128();
|
||||
let end = key.end.to_i128();
|
||||
|
||||
let layer_covers = |layer: Option<Arc<L>>| match layer {
|
||||
Some(layer) => layer.get_lsn_range().start >= lsn.start,
|
||||
let layer_covers = |key: Option<&LayerKey>| match key {
|
||||
Some(key) => self.mapping.get(key).unwrap().get_lsn_range().start >= lsn.start,
|
||||
None => false,
|
||||
};
|
||||
|
||||
@@ -403,7 +395,7 @@ where
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
|
||||
self.historic.iter()
|
||||
self.mapping.values().cloned()
|
||||
}
|
||||
|
||||
///
|
||||
@@ -430,10 +422,13 @@ where
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
|
||||
let mut current_key = start;
|
||||
let mut current_val = version.image_coverage.query(start);
|
||||
let mut current_val = version.image_coverage.query(start)
|
||||
.map(|key| self.mapping.get(&key).unwrap().clone());
|
||||
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in version.image_coverage.range(start..end) {
|
||||
let change_val = change_val.map(|key| self.mapping.get(&key).unwrap().clone());
|
||||
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
coverage.push((kr, current_val.take()));
|
||||
current_key = change_key;
|
||||
@@ -527,6 +522,7 @@ where
|
||||
for (change_key, change_val) in version.delta_coverage.range(start..end) {
|
||||
// If there's a relevant delta in this part, add 1 and recurse down
|
||||
if let Some(val) = current_val {
|
||||
let val = self.mapping.get(&val).unwrap().clone();
|
||||
if val.get_lsn_range().end > lsn.start {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
@@ -549,6 +545,7 @@ where
|
||||
|
||||
// Consider the last part
|
||||
if let Some(val) = current_val {
|
||||
let val = self.mapping.get(&val).unwrap().clone();
|
||||
if val.get_lsn_range().end > lsn.start {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
@@ -705,7 +702,7 @@ where
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
|
||||
Ok(self.l0_delta_layers.clone())
|
||||
Ok(self.l0_delta_layers.values().cloned().collect())
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
@@ -730,18 +727,6 @@ where
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn' references because of multiple vtables
|
||||
// can be created in compilation. Clippy complains about this. In practice it seems to
|
||||
// work.
|
||||
//
|
||||
// In future rust versions this might become Arc::as_ptr(left) as *const () ==
|
||||
// Arc::as_ptr(right) as *const (), we could change to that before.
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(left, right)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -802,7 +787,6 @@ mod tests {
|
||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||
|
||||
let replaced = map
|
||||
.batch_update()
|
||||
.replace_historic(&remote, downloaded.clone())
|
||||
.expect("name derived attributes are the same");
|
||||
assert!(
|
||||
|
||||
@@ -12,7 +12,7 @@ use super::layer_coverage::LayerCoverageTuple;
|
||||
/// These three values are enough to uniquely identify a layer, since
|
||||
/// a layer is obligated to contain all contents within range, so two
|
||||
/// deltas (or images) with the same range have identical content.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
pub struct LayerKey {
|
||||
// TODO I use i128 and u64 because it was easy for prototyping,
|
||||
// testing, and benchmarking. If we can use the Lsn and Key
|
||||
@@ -438,46 +438,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
///
|
||||
/// Returns a `Replacement` value describing the outcome; only the case of
|
||||
/// `Replacement::Replaced` modifies the map and requires a rebuild.
|
||||
pub fn replace<F>(
|
||||
&mut self,
|
||||
layer_key: &LayerKey,
|
||||
new: Value,
|
||||
check_expected: F,
|
||||
) -> Replacement<Value>
|
||||
where
|
||||
F: FnOnce(&Value) -> bool,
|
||||
{
|
||||
let (slot, in_buffered) = match self.buffer.get(layer_key) {
|
||||
Some(inner @ Some(_)) => {
|
||||
// we compare against the buffered version, because there will be a later
|
||||
// rebuild before querying
|
||||
(inner.as_ref(), true)
|
||||
}
|
||||
Some(None) => {
|
||||
// buffer has removal for this key; it will not be equivalent by any check_expected.
|
||||
return Replacement::RemovalBuffered;
|
||||
}
|
||||
None => {
|
||||
// no pending modification for the key, check layers
|
||||
(self.layers.get(layer_key), false)
|
||||
}
|
||||
};
|
||||
|
||||
match slot {
|
||||
Some(existing) if !check_expected(existing) => {
|
||||
// unfortunate clone here, but otherwise the nll borrowck grows the region of
|
||||
// 'a to cover the whole function, and we could not mutate in the other
|
||||
// Some(existing) branch
|
||||
Replacement::Unexpected(existing.clone())
|
||||
}
|
||||
None => Replacement::NotFound,
|
||||
Some(_existing) => {
|
||||
self.insert(layer_key.to_owned(), new);
|
||||
Replacement::Replaced { in_buffered }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rebuild(&mut self) {
|
||||
// Find the first LSN that needs to be rebuilt
|
||||
let rebuild_since: u64 = match self.buffer.iter().next() {
|
||||
@@ -521,17 +481,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
// but it's not necessary for now.
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
|
||||
self.layers.values().cloned()
|
||||
}
|
||||
|
||||
/// Return a reference to a queryable map, assuming all updates
|
||||
/// have already been processed using self.rebuild()
|
||||
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
|
||||
@@ -670,139 +619,3 @@ fn test_retroactive_simple() {
|
||||
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retroactive_replacement() {
|
||||
let mut map = BufferedHistoricLayerCoverage::new();
|
||||
|
||||
let keys = [
|
||||
LayerKey {
|
||||
key: 0..5,
|
||||
lsn: 100..101,
|
||||
is_image: true,
|
||||
},
|
||||
LayerKey {
|
||||
key: 3..9,
|
||||
lsn: 110..111,
|
||||
is_image: true,
|
||||
},
|
||||
LayerKey {
|
||||
key: 4..6,
|
||||
lsn: 120..121,
|
||||
is_image: true,
|
||||
},
|
||||
];
|
||||
|
||||
let layers = [
|
||||
"Image 1".to_string(),
|
||||
"Image 2".to_string(),
|
||||
"Image 3".to_string(),
|
||||
];
|
||||
|
||||
for (key, layer) in keys.iter().zip(layers.iter()) {
|
||||
map.insert(key.to_owned(), layer.to_owned());
|
||||
}
|
||||
|
||||
// rebuild is not necessary here, because replace works for both buffered updates and existing
|
||||
// layers.
|
||||
|
||||
for (key, orig_layer) in keys.iter().zip(layers.iter()) {
|
||||
let replacement = format!("Remote {orig_layer}");
|
||||
|
||||
// evict
|
||||
let ret = map.replace(key, replacement.clone(), |l| l == orig_layer);
|
||||
assert!(
|
||||
matches!(ret, Replacement::Replaced { .. }),
|
||||
"replace {orig_layer}: {ret:?}"
|
||||
);
|
||||
map.rebuild();
|
||||
|
||||
let at = key.lsn.end + 1;
|
||||
|
||||
let version = map.get().expect("rebuilt").get_version(at).unwrap();
|
||||
assert_eq!(
|
||||
version.image_coverage.query(4).as_deref(),
|
||||
Some(replacement.as_str()),
|
||||
"query for 4 at version {at} after eviction",
|
||||
);
|
||||
|
||||
// download
|
||||
let ret = map.replace(key, orig_layer.clone(), |l| l == &replacement);
|
||||
assert!(
|
||||
matches!(ret, Replacement::Replaced { .. }),
|
||||
"replace {orig_layer} back: {ret:?}"
|
||||
);
|
||||
map.rebuild();
|
||||
let version = map.get().expect("rebuilt").get_version(at).unwrap();
|
||||
assert_eq!(
|
||||
version.image_coverage.query(4).as_deref(),
|
||||
Some(orig_layer.as_str()),
|
||||
"query for 4 at version {at} after download",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_key_is_not_inserted_with_replace() {
|
||||
let mut map = BufferedHistoricLayerCoverage::new();
|
||||
let key = LayerKey {
|
||||
key: 0..5,
|
||||
lsn: 100..101,
|
||||
is_image: true,
|
||||
};
|
||||
|
||||
let ret = map.replace(&key, "should not replace", |_| true);
|
||||
assert!(matches!(ret, Replacement::NotFound), "{ret:?}");
|
||||
map.rebuild();
|
||||
assert!(map
|
||||
.get()
|
||||
.expect("no changes to rebuild")
|
||||
.get_version(102)
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_buffered_insert_and_remove() {
|
||||
let mut map = BufferedHistoricLayerCoverage::new();
|
||||
let key = LayerKey {
|
||||
key: 0..5,
|
||||
lsn: 100..101,
|
||||
is_image: true,
|
||||
};
|
||||
|
||||
map.insert(key.clone(), "Image 1");
|
||||
let ret = map.replace(&key, "Remote Image 1", |&l| l == "Image 1");
|
||||
assert!(
|
||||
matches!(ret, Replacement::Replaced { in_buffered: true }),
|
||||
"{ret:?}"
|
||||
);
|
||||
map.rebuild();
|
||||
|
||||
assert_eq!(
|
||||
map.get()
|
||||
.expect("rebuilt")
|
||||
.get_version(102)
|
||||
.unwrap()
|
||||
.image_coverage
|
||||
.query(4),
|
||||
Some("Remote Image 1")
|
||||
);
|
||||
|
||||
map.remove(key.clone());
|
||||
let ret = map.replace(&key, "should not replace", |_| true);
|
||||
assert!(
|
||||
matches!(ret, Replacement::RemovalBuffered),
|
||||
"cannot replace after scheduled remove: {ret:?}"
|
||||
);
|
||||
|
||||
map.rebuild();
|
||||
|
||||
let ret = map.replace(&key, "should not replace", |_| true);
|
||||
assert!(
|
||||
matches!(ret, Replacement::NotFound),
|
||||
"cannot replace after remove + rebuild: {ret:?}"
|
||||
);
|
||||
|
||||
let at_version = map.get().expect("rebuilt").get_version(102);
|
||||
assert!(at_version.is_none());
|
||||
}
|
||||
|
||||
@@ -101,24 +101,24 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
/// Get the latest (by lsn.end) layer at a given key
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
pub fn query(&self, key: i128) -> Option<Value> {
|
||||
pub fn query(&self, key: i128) -> Option<&Value> {
|
||||
self.nodes
|
||||
.range(..=key)
|
||||
.rev()
|
||||
.next()?
|
||||
.1
|
||||
.as_ref()
|
||||
.map(|(_, v)| v.clone())
|
||||
.map(|(_, v)| v)
|
||||
}
|
||||
|
||||
/// Iterate the changes in layer coverage in a given range. You will likely
|
||||
/// want to start with self.query(key.start), and then follow up with self.range
|
||||
///
|
||||
/// Complexity: O(log N + result_size)
|
||||
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
|
||||
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<&Value>)> {
|
||||
self.nodes
|
||||
.range(key)
|
||||
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
|
||||
.map(|(k, v)| (*k, v.as_ref().map(|x| &x.1)))
|
||||
}
|
||||
|
||||
/// O(1) clone
|
||||
|
||||
@@ -285,17 +285,22 @@ pub async fn create_tenant(
|
||||
}).await
|
||||
}
|
||||
|
||||
pub async fn update_tenant_config(
|
||||
pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
let tenant_config_path = conf.tenant_config_path(tenant_id);
|
||||
Tenant::persist_tenant_config(&tenant.tenant_id(), &tenant_config_path, tenant_conf, false)?;
|
||||
Tenant::persist_tenant_config(
|
||||
&tenant.tenant_id(),
|
||||
&tenant_config_path,
|
||||
new_tenant_conf,
|
||||
false,
|
||||
)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -9,13 +9,21 @@ mod remote_layer;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use enum_map::EnumMap;
|
||||
use enumset::EnumSet;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -83,9 +91,156 @@ pub enum ValueReconstructResult {
|
||||
Missing,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct LayerAccessStatsInner {
|
||||
first_access: Option<LayerAccessStatFullDetails>,
|
||||
count_by_access_kind: EnumMap<LayerAccessKind, u64>,
|
||||
task_kind_flag: EnumSet<TaskKind>,
|
||||
last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
|
||||
last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct LayerAccessStatFullDetails {
|
||||
when: SystemTime,
|
||||
task_kind: TaskKind,
|
||||
access_kind: LayerAccessKind,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, strum_macros::EnumString)]
|
||||
pub enum LayerAccessStatsReset {
|
||||
NoReset,
|
||||
JustTaskKindFlags,
|
||||
AllStats,
|
||||
}
|
||||
|
||||
fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
|
||||
ts.duration_since(UNIX_EPOCH)
|
||||
.expect("better to die in this unlikely case than report false stats")
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("64 bits is enough for few more years")
|
||||
}
|
||||
|
||||
impl LayerAccessStatFullDetails {
|
||||
fn to_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
|
||||
let Self {
|
||||
when,
|
||||
task_kind,
|
||||
access_kind,
|
||||
} = self;
|
||||
pageserver_api::models::LayerAccessStatFullDetails {
|
||||
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
|
||||
task_kind: task_kind.into(), // into static str, powered by strum_macros
|
||||
access_kind: *access_kind,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerAccessStats {
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
}
|
||||
|
||||
pub(crate) fn for_new_layer_file() -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
|
||||
new.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
new
|
||||
}
|
||||
|
||||
/// Creates a clone of `self` and records `new_status` in the clone.
|
||||
/// The `new_status` is not recorded in `self`
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats {
|
||||
let clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new
|
||||
}
|
||||
|
||||
fn record_residence_event(
|
||||
&self,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
inner
|
||||
.last_residence_changes
|
||||
.write(LayerResidenceEvent::new(status, reason));
|
||||
}
|
||||
|
||||
fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
let this_access = LayerAccessStatFullDetails {
|
||||
when: SystemTime::now(),
|
||||
task_kind,
|
||||
access_kind,
|
||||
};
|
||||
inner
|
||||
.first_access
|
||||
.get_or_insert_with(|| this_access.clone());
|
||||
inner.count_by_access_kind[access_kind] += 1;
|
||||
inner.task_kind_flag |= task_kind;
|
||||
inner.last_accesses.write(this_access);
|
||||
}
|
||||
fn to_api_model(
|
||||
&self,
|
||||
reset: LayerAccessStatsReset,
|
||||
) -> pageserver_api::models::LayerAccessStats {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
let LayerAccessStatsInner {
|
||||
first_access,
|
||||
count_by_access_kind,
|
||||
task_kind_flag,
|
||||
last_accesses,
|
||||
last_residence_changes,
|
||||
} = &*inner;
|
||||
let ret = pageserver_api::models::LayerAccessStats {
|
||||
access_count_by_access_kind: count_by_access_kind
|
||||
.iter()
|
||||
.map(|(kind, count)| (kind, *count))
|
||||
.collect(),
|
||||
task_kind_access_flag: task_kind_flag
|
||||
.iter()
|
||||
.map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
|
||||
.collect(),
|
||||
first: first_access.as_ref().map(|a| a.to_api_model()),
|
||||
accesses_history: last_accesses.map(|m| m.to_api_model()),
|
||||
residence_events_history: last_residence_changes.clone(),
|
||||
};
|
||||
match reset {
|
||||
LayerAccessStatsReset::NoReset => (),
|
||||
LayerAccessStatsReset::JustTaskKindFlags => {
|
||||
inner.task_kind_flag.clear();
|
||||
}
|
||||
LayerAccessStatsReset::AllStats => {
|
||||
*inner = LayerAccessStatsInner::default();
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
|
||||
/// required by [`LayerMap`].
|
||||
pub trait Layer: Send + Sync {
|
||||
///
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -148,8 +303,7 @@ pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i>;
|
||||
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
|
||||
/// A delta layer contains all modifications within a range of LSNs and keys.
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN
|
||||
///
|
||||
/// LSN.
|
||||
pub trait PersistentLayer: Layer {
|
||||
fn get_tenant_id(&self) -> TenantId;
|
||||
|
||||
@@ -190,7 +344,9 @@ pub trait PersistentLayer: Layer {
|
||||
/// current_physical_size is computed as the som of this value.
|
||||
fn file_size(&self) -> Option<u64>;
|
||||
|
||||
fn info(&self) -> HistoricLayerInfo;
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
@@ -203,19 +359,11 @@ pub fn downcast_remote_layer(
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for dyn Layer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Layer")
|
||||
.field("short_id", &self.short_id())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
pub key: Range<Key>,
|
||||
pub lsn: Range<Lsn>,
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
@@ -55,7 +55,10 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{DeltaFileName, Layer, LayerFileName, LayerIter, LayerKeyIter, PathOrConf};
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, LayerResidenceStatus, PathOrConf,
|
||||
};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -167,14 +170,13 @@ impl DeltaKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// DeltaLayer is the in-memory data structure associated with an on-disk delta
|
||||
/// file.
|
||||
///
|
||||
/// DeltaLayer is the in-memory data structure associated with an
|
||||
/// on-disk delta file. We keep a DeltaLayer in memory for each
|
||||
/// file, in the LayerMap. If a layer is in "loaded" state, we have a
|
||||
/// copy of the index in memory, in 'inner'. Otherwise the struct is
|
||||
/// just a placeholder for a file that exists on disk, and it needs to
|
||||
/// be loaded before using it in queries.
|
||||
///
|
||||
/// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
|
||||
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
|
||||
/// Otherwise the struct is just a placeholder for a file that exists on disk,
|
||||
/// and it needs to be loaded before using it in queries.
|
||||
pub struct DeltaLayer {
|
||||
path_or_conf: PathOrConf,
|
||||
|
||||
@@ -185,9 +187,22 @@ pub struct DeltaLayer {
|
||||
|
||||
pub file_size: u64,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: RwLock<DeltaLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayer")
|
||||
.field("key_range", &self.key_range)
|
||||
.field("lsn_range", &self.lsn_range)
|
||||
.field("file_size", &self.file_size)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DeltaLayerInner {
|
||||
/// If false, the fields below have not been loaded into memory yet.
|
||||
loaded: bool,
|
||||
@@ -200,6 +215,16 @@ pub struct DeltaLayerInner {
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
.field("loaded", &self.loaded)
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
@@ -231,7 +256,7 @@ impl Layer for DeltaLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(ctx)?;
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx)?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
@@ -304,7 +329,7 @@ impl Layer for DeltaLayer {
|
||||
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load(ctx)?;
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
@@ -395,7 +420,9 @@ impl PersistentLayer for DeltaLayer {
|
||||
}
|
||||
|
||||
fn iter(&self, ctx: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
let inner = self.load(ctx).context("load delta layer")?;
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.context("load delta layer")?;
|
||||
Ok(match DeltaValueIter::new(inner) {
|
||||
Ok(iter) => Box::new(iter),
|
||||
Err(err) => Box::new(std::iter::once(Err(err))),
|
||||
@@ -403,7 +430,7 @@ impl PersistentLayer for DeltaLayer {
|
||||
}
|
||||
|
||||
fn key_iter(&self, ctx: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
let inner = self.load(ctx)?;
|
||||
let inner = self.load(LayerAccessKind::KeyIter, ctx)?;
|
||||
Ok(Box::new(
|
||||
DeltaKeyIter::new(inner).context("Layer index is corrupted")?,
|
||||
))
|
||||
@@ -419,18 +446,25 @@ impl PersistentLayer for DeltaLayer {
|
||||
Some(self.file_size)
|
||||
}
|
||||
|
||||
fn info(&self) -> HistoricLayerInfo {
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.filename().file_name();
|
||||
let lsn_range = self.get_lsn_range();
|
||||
|
||||
let access_stats = self.access_stats.to_api_model(reset);
|
||||
|
||||
HistoricLayerInfo::Delta {
|
||||
layer_file_name,
|
||||
layer_file_size: Some(self.file_size),
|
||||
lsn_start: lsn_range.start,
|
||||
lsn_end: lsn_range.end,
|
||||
remote: false,
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
@@ -475,7 +509,13 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
@@ -556,6 +596,7 @@ impl DeltaLayer {
|
||||
tenant_id: TenantId,
|
||||
filename: &DeltaFileName,
|
||||
file_size: u64,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> DeltaLayer {
|
||||
DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
@@ -564,6 +605,7 @@ impl DeltaLayer {
|
||||
key_range: filename.key_range.clone(),
|
||||
lsn_range: filename.lsn_range.clone(),
|
||||
file_size,
|
||||
access_stats,
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
@@ -593,6 +635,7 @@ impl DeltaLayer {
|
||||
key_range: summary.key_range,
|
||||
lsn_range: summary.lsn_range,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
@@ -763,6 +806,7 @@ impl DeltaLayerWriterInner {
|
||||
key_range: self.key_start..key_end,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_new_layer_file(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -27,14 +27,14 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use hex;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
@@ -53,7 +53,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerIter, PathOrConf};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -95,13 +95,13 @@ impl From<&ImageLayer> for Summary {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// ImageLayer is the in-memory data structure associated with an on-disk image
|
||||
/// file. We keep an ImageLayer in memory for each file, in the LayerMap. If a
|
||||
/// layer is in "loaded" state, we have a copy of the index in memory, in 'inner'.
|
||||
/// file.
|
||||
///
|
||||
/// We keep an ImageLayer in memory for each file, in the LayerMap. If a layer
|
||||
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
|
||||
/// Otherwise the struct is just a placeholder for a file that exists on disk,
|
||||
/// and it needs to be loaded before using it in queries.
|
||||
///
|
||||
pub struct ImageLayer {
|
||||
path_or_conf: PathOrConf,
|
||||
pub tenant_id: TenantId,
|
||||
@@ -112,9 +112,22 @@ pub struct ImageLayer {
|
||||
// This entry contains an image of all pages as of this LSN
|
||||
pub lsn: Lsn,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: RwLock<ImageLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ImageLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ImageLayer")
|
||||
.field("key_range", &self.key_range)
|
||||
.field("file_size", &self.file_size)
|
||||
.field("lsn", &self.lsn)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ImageLayerInner {
|
||||
/// If false, the 'index' has not been loaded into memory yet.
|
||||
loaded: bool,
|
||||
@@ -127,6 +140,16 @@ pub struct ImageLayerInner {
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ImageLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ImageLayerInner")
|
||||
.field("loaded", &self.loaded)
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
@@ -155,7 +178,7 @@ impl Layer for ImageLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(ctx)?;
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx)?;
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
@@ -182,7 +205,7 @@ impl Layer for ImageLayer {
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load(ctx)?;
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
@@ -237,7 +260,7 @@ impl PersistentLayer for ImageLayer {
|
||||
Some(self.file_size)
|
||||
}
|
||||
|
||||
fn info(&self) -> HistoricLayerInfo {
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.filename().file_name();
|
||||
let lsn_range = self.get_lsn_range();
|
||||
|
||||
@@ -246,8 +269,13 @@ impl PersistentLayer for ImageLayer {
|
||||
layer_file_size: Some(self.file_size),
|
||||
lsn_start: lsn_range.start,
|
||||
remote: false,
|
||||
access_stats: self.access_stats.to_api_model(reset),
|
||||
}
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
@@ -285,7 +313,13 @@ impl ImageLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(&self, _ctx: &RequestContext) -> Result<RwLockReadGuard<ImageLayerInner>> {
|
||||
fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<RwLockReadGuard<ImageLayerInner>> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
@@ -365,6 +399,7 @@ impl ImageLayer {
|
||||
tenant_id: TenantId,
|
||||
filename: &ImageFileName,
|
||||
file_size: u64,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> ImageLayer {
|
||||
ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
@@ -373,6 +408,7 @@ impl ImageLayer {
|
||||
key_range: filename.key_range.clone(),
|
||||
lsn: filename.lsn,
|
||||
file_size,
|
||||
access_stats,
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
@@ -400,6 +436,7 @@ impl ImageLayer {
|
||||
key_range: summary.key_range,
|
||||
lsn: summary.lsn,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
file: None,
|
||||
loaded: false,
|
||||
@@ -559,6 +596,7 @@ impl ImageLayerWriterInner {
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
file_size: metadata.len(),
|
||||
access_stats: LayerAccessStats::for_new_layer_file(),
|
||||
inner: RwLock::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
|
||||
@@ -53,6 +53,15 @@ pub struct InMemoryLayer {
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayer")
|
||||
.field("start_lsn", &self.start_lsn)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// Frozen layers have an exclusive end LSN.
|
||||
/// Writes are only allowed when this is None
|
||||
@@ -71,6 +80,14 @@ pub struct InMemoryLayerInner {
|
||||
file: EphemeralFile,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayerInner")
|
||||
.field("end_lsn", &self.end_lsn)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayerInner {
|
||||
fn assert_writeable(&self) {
|
||||
assert!(self.end_lsn.is_none());
|
||||
|
||||
@@ -19,9 +19,19 @@ use utils::{
|
||||
|
||||
use super::filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
use super::image_layer::ImageLayer;
|
||||
use super::{DeltaLayer, LayerIter, LayerKeyIter, PersistentLayer};
|
||||
use super::{
|
||||
DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
/// [`crate::storage_layer::DeltaLayer`].
|
||||
///
|
||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
||||
/// allowed download remote layers and during which, it gets replaced with a
|
||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||
///
|
||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||
pub struct RemoteLayer {
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
@@ -36,9 +46,21 @@ pub struct RemoteLayer {
|
||||
|
||||
is_incremental: bool,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteLayer")
|
||||
.field("file_name", &self.file_name)
|
||||
.field("layer_metadata", &self.layer_metadata)
|
||||
.field("is_incremental", &self.is_incremental)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
@@ -138,7 +160,7 @@ impl PersistentLayer for RemoteLayer {
|
||||
self.layer_metadata.file_size()
|
||||
}
|
||||
|
||||
fn info(&self) -> HistoricLayerInfo {
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.filename().file_name();
|
||||
let lsn_range = self.get_lsn_range();
|
||||
|
||||
@@ -149,6 +171,7 @@ impl PersistentLayer for RemoteLayer {
|
||||
lsn_start: lsn_range.start,
|
||||
lsn_end: lsn_range.end,
|
||||
remote: true,
|
||||
access_stats: self.access_stats.to_api_model(reset),
|
||||
}
|
||||
} else {
|
||||
HistoricLayerInfo::Image {
|
||||
@@ -156,9 +179,14 @@ impl PersistentLayer for RemoteLayer {
|
||||
layer_file_size: self.layer_metadata.file_size(),
|
||||
lsn_start: lsn_range.start,
|
||||
remote: true,
|
||||
access_stats: self.access_stats.to_api_model(reset),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayer {
|
||||
@@ -167,6 +195,7 @@ impl RemoteLayer {
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
@@ -178,6 +207,7 @@ impl RemoteLayer {
|
||||
file_name: fname.to_owned().into(),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,6 +216,7 @@ impl RemoteLayer {
|
||||
timelineid: TimelineId,
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
@@ -197,6 +228,7 @@ impl RemoteLayer {
|
||||
file_name: fname.to_owned().into(),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,6 +249,8 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
} else {
|
||||
let fname = ImageFileName {
|
||||
@@ -229,6 +263,8 @@ impl RemoteLayer {
|
||||
self.tenantid,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, TimelineState,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
|
||||
};
|
||||
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -30,8 +30,8 @@ use crate::broker_client::is_broker_client_initialized;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
|
||||
RemoteLayer,
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
@@ -72,7 +72,7 @@ use walreceiver::spawn_connection_manager_task;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{DeltaLayer, ImageLayer, Layer};
|
||||
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum FlushLoopState {
|
||||
@@ -835,7 +835,7 @@ impl Timeline {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self) -> LayerMapInfo {
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
@@ -847,7 +847,7 @@ impl Timeline {
|
||||
|
||||
let mut historic_layers = Vec::new();
|
||||
for historic_layer in layer_map.iter_historic_layers() {
|
||||
historic_layers.push(historic_layer.info());
|
||||
historic_layers.push(historic_layer.info(reset));
|
||||
}
|
||||
|
||||
LayerMapInfo {
|
||||
@@ -891,12 +891,18 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
&image_name,
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&delta_name,
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(LayerResidenceStatus::Evicted),
|
||||
),
|
||||
});
|
||||
|
||||
@@ -1172,6 +1178,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1203,6 +1210,7 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
@@ -1340,6 +1348,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
@@ -1364,6 +1373,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer);
|
||||
@@ -3384,11 +3394,10 @@ impl Timeline {
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
{
|
||||
use crate::tenant::layer_map::Replacement;
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
match updates.replace_historic(&l, new_layer) {
|
||||
match layers.replace_historic(&l, new_layer) {
|
||||
Ok(Replacement::Replaced { .. }) => { /* expected */ }
|
||||
Ok(Replacement::NotFound) => {
|
||||
// TODO: the downloaded file should probably be removed, otherwise
|
||||
@@ -3422,7 +3431,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
|
||||
// Now that we've inserted the download into the layer map,
|
||||
|
||||
@@ -50,6 +50,8 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_storage_operations_seconds_global_count",
|
||||
"pageserver_storage_operations_seconds_global_sum",
|
||||
"pageserver_storage_operations_seconds_global_bucket",
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -1205,6 +1205,11 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def tenant_config(self, tenant_id: TenantId) -> TenantConfig:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config")
|
||||
self.verbose_error(res)
|
||||
return TenantConfig.from_json(res.json())
|
||||
|
||||
def tenant_size(self, tenant_id: TenantId) -> int:
|
||||
return self.tenant_size_and_modelinputs(tenant_id)[0]
|
||||
|
||||
@@ -1500,6 +1505,19 @@ class PageserverHttpClient(requests.Session):
|
||||
assert res.status_code == 200
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantConfig:
|
||||
tenant_specific_overrides: Dict[str, Any]
|
||||
effective_config: Dict[str, Any]
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> TenantConfig:
|
||||
return TenantConfig(
|
||||
tenant_specific_overrides=d["tenant_specific_overrides"],
|
||||
effective_config=d["effective_config"],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LayerMapInfo:
|
||||
in_memory_layers: List[InMemoryLayerInfo]
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
.build/
|
||||
1
test_runner/pg_clients/swift/PostgresNIOExample/.gitignore
vendored
Normal file
1
test_runner/pg_clients/swift/PostgresNIOExample/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
.build/
|
||||
10
test_runner/pg_clients/swift/PostgresNIOExample/Dockerfile
Normal file
10
test_runner/pg_clients/swift/PostgresNIOExample/Dockerfile
Normal file
@@ -0,0 +1,10 @@
|
||||
FROM swift:5.7 AS build
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
RUN swift build --configuration release
|
||||
|
||||
FROM swift:5.7
|
||||
WORKDIR /app
|
||||
COPY --from=build /source/.build/release .
|
||||
CMD ["/app/PostgresNIOExample"]
|
||||
@@ -0,0 +1,86 @@
|
||||
{
|
||||
"pins" : [
|
||||
{
|
||||
"identity" : "postgres-nio",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/vapor/postgres-nio.git",
|
||||
"state" : {
|
||||
"revision" : "7daf026e145de2c07d6e37f4171b1acb4b5f22b1",
|
||||
"version" : "1.12.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-atomics",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-atomics.git",
|
||||
"state" : {
|
||||
"revision" : "ff3d2212b6b093db7f177d0855adbc4ef9c5f036",
|
||||
"version" : "1.0.3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-collections",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-collections.git",
|
||||
"state" : {
|
||||
"revision" : "937e904258d22af6e447a0b72c0bc67583ef64a2",
|
||||
"version" : "1.0.4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-crypto",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-crypto.git",
|
||||
"state" : {
|
||||
"revision" : "75ec60b8b4cc0f085c3ac414f3dca5625fa3588e",
|
||||
"version" : "2.2.4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-log",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-log.git",
|
||||
"state" : {
|
||||
"revision" : "32e8d724467f8fe623624570367e3d50c5638e46",
|
||||
"version" : "1.5.2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-metrics",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-metrics.git",
|
||||
"state" : {
|
||||
"revision" : "9b39d811a83cf18b79d7d5513b06f8b290198b10",
|
||||
"version" : "2.3.3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio.git",
|
||||
"state" : {
|
||||
"revision" : "45167b8006448c79dda4b7bd604e07a034c15c49",
|
||||
"version" : "2.48.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio-ssl",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio-ssl.git",
|
||||
"state" : {
|
||||
"revision" : "4fb7ead803e38949eb1d6fabb849206a72c580f3",
|
||||
"version" : "2.23.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio-transport-services",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio-transport-services.git",
|
||||
"state" : {
|
||||
"revision" : "c0d9a144cfaec8d3d596aadde3039286a266c15c",
|
||||
"version" : "1.15.0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"version" : 2
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
// swift-tools-version:5.7
|
||||
import PackageDescription
|
||||
|
||||
let package = Package(
|
||||
name: "PostgresNIOExample",
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.8.0")
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
name: "PostgresNIOExample",
|
||||
dependencies: [
|
||||
.product(name: "PostgresNIO", package: "postgres-nio"),
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
@@ -0,0 +1,49 @@
|
||||
import Foundation
|
||||
|
||||
import PostgresNIO
|
||||
import NIOPosix
|
||||
import Logging
|
||||
|
||||
await Task {
|
||||
do {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
let logger = Logger(label: "postgres-logger")
|
||||
|
||||
let env = ProcessInfo.processInfo.environment
|
||||
|
||||
let sslContext = try! NIOSSLContext(configuration: .makeClientConfiguration())
|
||||
|
||||
let config = PostgresConnection.Configuration(
|
||||
connection: .init(
|
||||
host: env["NEON_HOST"] ?? "",
|
||||
port: 5432
|
||||
),
|
||||
authentication: .init(
|
||||
username: env["NEON_USER"] ?? "",
|
||||
database: env["NEON_DATABASE"] ?? "",
|
||||
password: env["NEON_PASSWORD"] ?? ""
|
||||
),
|
||||
tls: .require(sslContext)
|
||||
)
|
||||
|
||||
let connection = try await PostgresConnection.connect(
|
||||
on: eventLoopGroup.next(),
|
||||
configuration: config,
|
||||
id: 1,
|
||||
logger: logger
|
||||
)
|
||||
|
||||
let rows = try await connection.query("SELECT 1 as col", logger: logger)
|
||||
for try await (n) in rows.decode((Int).self, context: .default) {
|
||||
print(n)
|
||||
}
|
||||
|
||||
// Close your connection once done
|
||||
try await connection.close()
|
||||
|
||||
// Shutdown the EventLoopGroup, once all connections are closed.
|
||||
try eventLoopGroup.syncShutdownGracefully()
|
||||
} catch {
|
||||
print(error)
|
||||
}
|
||||
}.value
|
||||
@@ -19,6 +19,7 @@ from fixtures.utils import subprocess_capture
|
||||
"swift/PostgresClientKitExample", # See https://github.com/neondatabase/neon/pull/2008#discussion_r911896592
|
||||
marks=pytest.mark.xfail(reason="Neither SNI nor parameters is supported"),
|
||||
),
|
||||
"swift/PostgresNIOExample",
|
||||
"typescript/postgresql-client",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -22,6 +22,7 @@ wait_lsn_timeout='111 s';
|
||||
tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
# Check that we raise on misspelled configs
|
||||
invalid_conf_key = "some_invalid_setting_name_blah_blah_123"
|
||||
@@ -36,12 +37,11 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
else:
|
||||
raise AssertionError("Expected validation error")
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"checkpoint_distance": "20000",
|
||||
"gc_period": "30sec",
|
||||
}
|
||||
)
|
||||
new_conf = {
|
||||
"checkpoint_distance": "20000",
|
||||
"gc_period": "30sec",
|
||||
}
|
||||
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
|
||||
|
||||
env.neon_cli.create_timeline("test_tenant_conf", tenant_id=tenant)
|
||||
env.postgres.create_start(
|
||||
@@ -69,7 +69,20 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
"image_creation_threshold": 3,
|
||||
"pitr_interval": 604800, # 7 days
|
||||
}.items()
|
||||
)
|
||||
), f"Unexpected res: {res}"
|
||||
default_tenant_config = http_client.tenant_config(tenant_id=env.initial_tenant)
|
||||
assert (
|
||||
not default_tenant_config.tenant_specific_overrides
|
||||
), "Should have no specific settings yet"
|
||||
effective_config = default_tenant_config.effective_config
|
||||
assert effective_config["checkpoint_distance"] == 10000
|
||||
assert effective_config["compaction_target_size"] == 1048576
|
||||
assert effective_config["compaction_period"] == "20s"
|
||||
assert effective_config["compaction_threshold"] == 10
|
||||
assert effective_config["gc_horizon"] == 67108864
|
||||
assert effective_config["gc_period"] == "1h"
|
||||
assert effective_config["image_creation_threshold"] == 3
|
||||
assert effective_config["pitr_interval"] == "7days"
|
||||
|
||||
# check the configuration of the new tenant
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
@@ -89,15 +102,37 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
"image_creation_threshold": 3,
|
||||
"pitr_interval": 604800,
|
||||
}.items()
|
||||
)
|
||||
), f"Unexpected res: {res}"
|
||||
new_tenant_config = http_client.tenant_config(tenant_id=tenant)
|
||||
new_specific_config = new_tenant_config.tenant_specific_overrides
|
||||
assert new_specific_config["checkpoint_distance"] == 20000
|
||||
assert new_specific_config["gc_period"] == "30s"
|
||||
assert len(new_specific_config) == len(
|
||||
new_conf
|
||||
), f"No more specific properties were expected, but got: {new_specific_config}"
|
||||
new_effective_config = new_tenant_config.effective_config
|
||||
assert (
|
||||
new_effective_config["checkpoint_distance"] == 20000
|
||||
), "Specific 'checkpoint_distance' config should override the default value"
|
||||
assert (
|
||||
new_effective_config["gc_period"] == "30s"
|
||||
), "Specific 'gc_period' config should override the default value"
|
||||
assert new_effective_config["compaction_target_size"] == 1048576
|
||||
assert new_effective_config["compaction_period"] == "20s"
|
||||
assert new_effective_config["compaction_threshold"] == 10
|
||||
assert new_effective_config["gc_horizon"] == 67108864
|
||||
assert new_effective_config["image_creation_threshold"] == 3
|
||||
assert new_effective_config["pitr_interval"] == "7days"
|
||||
|
||||
# update the config and ensure that it has changed
|
||||
conf_update = {
|
||||
"checkpoint_distance": "15000",
|
||||
"gc_period": "80sec",
|
||||
"compaction_period": "80sec",
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
conf={
|
||||
"checkpoint_distance": "15000",
|
||||
"gc_period": "80sec",
|
||||
},
|
||||
conf=conf_update,
|
||||
)
|
||||
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
@@ -110,14 +145,37 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
for i in {
|
||||
"checkpoint_distance": 15000,
|
||||
"compaction_target_size": 1048576,
|
||||
"compaction_period": 20,
|
||||
"compaction_period": 80,
|
||||
"compaction_threshold": 10,
|
||||
"gc_horizon": 67108864,
|
||||
"gc_period": 80,
|
||||
"image_creation_threshold": 3,
|
||||
"pitr_interval": 604800,
|
||||
}.items()
|
||||
)
|
||||
), f"Unexpected res: {res}"
|
||||
updated_tenant_config = http_client.tenant_config(tenant_id=tenant)
|
||||
updated_specific_config = updated_tenant_config.tenant_specific_overrides
|
||||
assert updated_specific_config["checkpoint_distance"] == 15000
|
||||
assert updated_specific_config["gc_period"] == "1m 20s"
|
||||
assert updated_specific_config["compaction_period"] == "1m 20s"
|
||||
assert len(updated_specific_config) == len(
|
||||
conf_update
|
||||
), f"No more specific properties were expected, but got: {updated_specific_config}"
|
||||
updated_effective_config = updated_tenant_config.effective_config
|
||||
assert (
|
||||
updated_effective_config["checkpoint_distance"] == 15000
|
||||
), "Specific 'checkpoint_distance' config should override the default value"
|
||||
assert (
|
||||
updated_effective_config["gc_period"] == "1m 20s"
|
||||
), "Specific 'gc_period' config should override the default value"
|
||||
assert (
|
||||
updated_effective_config["compaction_period"] == "1m 20s"
|
||||
), "Specific 'compaction_period' config should override the default value"
|
||||
assert updated_effective_config["compaction_target_size"] == 1048576
|
||||
assert updated_effective_config["compaction_threshold"] == 10
|
||||
assert updated_effective_config["gc_horizon"] == 67108864
|
||||
assert updated_effective_config["image_creation_threshold"] == 3
|
||||
assert updated_effective_config["pitr_interval"] == "7days"
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
@@ -133,22 +191,44 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
for i in {
|
||||
"checkpoint_distance": 15000,
|
||||
"compaction_target_size": 1048576,
|
||||
"compaction_period": 20,
|
||||
"compaction_period": 80,
|
||||
"compaction_threshold": 10,
|
||||
"gc_horizon": 67108864,
|
||||
"gc_period": 80,
|
||||
"image_creation_threshold": 3,
|
||||
"pitr_interval": 604800,
|
||||
}.items()
|
||||
)
|
||||
), f"Unexpected res: {res}"
|
||||
restarted_tenant_config = http_client.tenant_config(tenant_id=tenant)
|
||||
assert (
|
||||
restarted_tenant_config == updated_tenant_config
|
||||
), "Updated config should not change after the restart"
|
||||
|
||||
# update the config with very short config and make sure no trailing chars are left from previous config
|
||||
final_conf = {
|
||||
"pitr_interval": "1 min",
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
conf={
|
||||
"pitr_interval": "1 min",
|
||||
},
|
||||
conf=final_conf,
|
||||
)
|
||||
final_tenant_config = http_client.tenant_config(tenant_id=tenant)
|
||||
final_specific_config = final_tenant_config.tenant_specific_overrides
|
||||
assert final_specific_config["pitr_interval"] == "1m"
|
||||
assert len(final_specific_config) == len(
|
||||
final_conf
|
||||
), f"No more specific properties were expected, but got: {final_specific_config}"
|
||||
final_effective_config = final_tenant_config.effective_config
|
||||
assert (
|
||||
final_effective_config["pitr_interval"] == "1m"
|
||||
), "Specific 'pitr_interval' config should override the default value"
|
||||
assert final_effective_config["checkpoint_distance"] == 10000
|
||||
assert final_effective_config["compaction_target_size"] == 1048576
|
||||
assert final_effective_config["compaction_period"] == "20s"
|
||||
assert final_effective_config["compaction_threshold"] == 10
|
||||
assert final_effective_config["gc_horizon"] == 67108864
|
||||
assert final_effective_config["gc_period"] == "1h"
|
||||
assert final_effective_config["image_creation_threshold"] == 3
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
@@ -165,7 +245,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
|
||||
"compaction_period": 20,
|
||||
"pitr_interval": 60,
|
||||
}.items()
|
||||
)
|
||||
), f"Unexpected res: {res}"
|
||||
|
||||
|
||||
def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -2,6 +2,7 @@ from typing import Any, List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.types import Lsn
|
||||
|
||||
@@ -368,6 +369,17 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert size_after == prev, "size after restarting pageserver should not have changed"
|
||||
|
||||
ps_metrics = parse_metrics(http_client.get_metrics(), "pageserver")
|
||||
tenant_metric_filter = {
|
||||
"tenant_id": str(tenant_id),
|
||||
}
|
||||
|
||||
tenant_size_metric = int(
|
||||
ps_metrics.query_one("pageserver_tenant_synthetic_size", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
assert tenant_size_metric == size_after, "API size value should be equal to metric size value"
|
||||
|
||||
|
||||
def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user