mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
refactor(pageserver): rely on serde derive for toml deserialization (#7656)
This PR simplifies the pageserver configuration parsing as follows:
* introduce the `pageserver_api::config::ConfigToml` type
* implement `Default` for `ConfigToml`
* use serde derive to do the brain-dead leg-work of processing the toml
document
* use `serde(default)` to fill in default values
* in `pageserver` crate:
* use `toml_edit` to deserialize the pageserver.toml string into a
`ConfigToml`
* `PageServerConfig::parse_and_validate` then
* consumes the `ConfigToml`
* destructures it exhaustively into its constituent fields
* constructs the `PageServerConfig`
The rules are:
* in `ConfigToml`, use `deny_unknown_fields` everywhere
* static default values go in `pageserver_api`
* if there cannot be a static default value (e.g. which default IO
engine to use, because it depends on the runtime), make the field in
`ConfigToml` an `Option`
* if runtime-augmentation of a value is needed, do that in
`parse_and_validate`
* a good example is `virtual_file_io_engine` or `l0_flush`, both of
which need to execute code to determine the effective value in
`PageServerConf`
The benefits:
* massive amount of brain-dead repetitive code can be deleted
* "unused variable" compile-time errors when removing a config value,
due to the exhaustive destructuring in `parse_and_validate`
* compile-time errors guide you when adding a new config field
Drawbacks:
* serde derive is sometimes a bit too magical
* `deny_unknown_fields` is easy to miss
Future Work / Benefits:
* make `neon_local` use `pageserver_api` to construct `ConfigToml` and
write it to `pageserver.toml`
* This provides more type safety / coompile-time errors than the current
approach.
### Refs
Fixes #3682
### Future Work
* `remote_storage` deser doesn't reject unknown fields
https://github.com/neondatabase/neon/issues/8915
* clean up `libs/pageserver_api/src/config.rs` further
* break up into multiple files, at least for tenant config
* move `models` as appropriate / refine distinction between config and
API models / be explicit about when it's the same
* use `pub(crate)` visibility on `mod defaults` to detect stale values
This commit is contained in:
committed by
GitHub
parent
6dfbf49128
commit
850421ec06
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -2727,6 +2727,12 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indoc"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
|
||||
|
||||
[[package]]
|
||||
name = "infer"
|
||||
version = "0.2.3"
|
||||
@@ -3701,6 +3707,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"indoc",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
@@ -3766,6 +3773,7 @@ dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
@@ -3773,11 +3781,16 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"itertools 0.10.5",
|
||||
"nix 0.27.1",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"storage_broker",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
|
||||
@@ -103,6 +103,7 @@ humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
tokio-tungstenite = "0.20.0"
|
||||
indexmap = "2"
|
||||
indoc = "2"
|
||||
inotify = "0.10.2"
|
||||
ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -4,6 +4,10 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
# See pageserver/Cargo.toml
|
||||
testing = ["dep:nix"]
|
||||
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
@@ -23,6 +27,12 @@ thiserror.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
itertools.workspace = true
|
||||
storage_broker.workspace = true
|
||||
camino = {workspace = true, features = ["serde1"]}
|
||||
remote_storage.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -1,15 +1,28 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use const_format::formatcp;
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use const_format::formatcp;
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use serde_with::serde_as;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
|
||||
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
|
||||
// as a separate structure. This information is not neeed by the pageserver
|
||||
// itself, it is only used for registering the pageserver with the control
|
||||
@@ -29,3 +42,511 @@ pub struct NodeMetadata {
|
||||
#[serde(flatten)]
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
///
|
||||
/// We use serde derive with `#[serde(default)]` to generate a deserializer
|
||||
/// that fills in the default values for each config field.
|
||||
///
|
||||
/// If there cannot be a static default value because we need to make runtime
|
||||
/// checks to determine the default, make it an `Option` (which defaults to None).
|
||||
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct ConfigToml {
|
||||
// types mapped 1:1 into the runtime PageServerConfig type
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub availability_zone: Option<String>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wait_lsn_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub pg_auth_type: AuthType,
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub tenant_config: TenantConfigToml,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub broker_endpoint: storage_broker::Uri,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub broker_keepalive_interval: Duration,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub log_format: LogFormat,
|
||||
pub concurrent_tenant_warmup: NonZeroUsize,
|
||||
pub concurrent_tenant_size_logical_size_queries: NonZeroUsize,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub metric_collection_interval: Duration,
|
||||
pub metric_collection_endpoint: Option<reqwest::Url>,
|
||||
pub metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
pub test_remote_failures: u64,
|
||||
pub ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub background_task_maximum_delay: Duration,
|
||||
pub control_plane_api: Option<reqwest::Url>,
|
||||
pub control_plane_api_token: Option<String>,
|
||||
pub control_plane_emergency_mode: bool,
|
||||
pub heatmap_upload_concurrency: usize,
|
||||
pub secondary_download_concurrency: usize,
|
||||
pub virtual_file_io_engine: Option<crate::models::virtual_file::IoEngineKind>,
|
||||
pub ingest_batch_size: u64,
|
||||
pub max_vectored_read_bytes: MaxVectoredReadBytes,
|
||||
pub image_compression: ImageCompressionAlgorithm,
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
pub l0_flush: Option<crate::models::L0FlushConfig>,
|
||||
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
|
||||
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
|
||||
pub io_buffer_alignment: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: utils::serde_percent::Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub period: Duration,
|
||||
#[cfg(feature = "testing")]
|
||||
pub mock_statvfs: Option<statvfs::mock::Behavior>,
|
||||
/// Select sorting for evicted layers
|
||||
#[serde(default)]
|
||||
pub eviction_order: EvictionOrder,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Behavior {
|
||||
Success {
|
||||
blocksize: u64,
|
||||
total_blocks: u64,
|
||||
name_filter: Option<utils::serde_regex::Regex>,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Failure { mocked_error: MockedError },
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum MockedError {
|
||||
EIO,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
impl From<MockedError> for nix::Error {
|
||||
fn from(e: MockedError) -> Self {
|
||||
match e {
|
||||
MockedError::EIO => nix::Error::EIO,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
pub enum EvictionOrder {
|
||||
RelativeAccessed {
|
||||
highest_layer_count_loses_first: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for EvictionOrder {
|
||||
fn default() -> Self {
|
||||
Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetVectoredImpl {
|
||||
Sequential,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetImpl {
|
||||
Legacy,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum CompactL0Phase1ValueAccess {
|
||||
/// The old way.
|
||||
PageCachedBlobIo,
|
||||
/// The new way.
|
||||
StreamingKmerge {
|
||||
/// If set, we run both the old way and the new way, validate that
|
||||
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
|
||||
/// and if the validation fails,
|
||||
/// - in tests: fail them with a panic or
|
||||
/// - in prod, log a rate-limited warning and use the old way's results.
|
||||
///
|
||||
/// If not set, we only run the new way and trust its results.
|
||||
validate: Option<CompactL0BypassPageCacheValidation>,
|
||||
},
|
||||
}
|
||||
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
fn default() -> Self {
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
|
||||
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A tenant's calcuated configuration, which is the result of merging a
|
||||
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
|
||||
///
|
||||
/// For storing and transmitting individual tenant's configuration, see
|
||||
/// TenantConfOpt.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields, default)]
|
||||
pub struct TenantConfigToml {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
// How often to check if there's compaction work to be done.
|
||||
// Duration::ZERO means automatic compaction is disabled.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Duration,
|
||||
// Level0 delta layer threshold for compaction.
|
||||
pub compaction_threshold: usize,
|
||||
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is #of bytes of WAL.
|
||||
// Page versions older than this are garbage collected away.
|
||||
pub gc_horizon: u64,
|
||||
// Interval at which garbage collection is triggered.
|
||||
// Duration::ZERO means automatic GC is disabled
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
// Page versions older than this are garbage collected away.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Duration,
|
||||
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Duration,
|
||||
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
|
||||
/// A stalled safekeeper will be changed to a newer one when it appears.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Duration,
|
||||
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub eviction_policy: crate::models::EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: crate::models::ThrottleConfig,
|
||||
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
|
||||
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: crate::models::AuxFilePolicy,
|
||||
|
||||
/// The length for an explicit LSN lease request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Duration,
|
||||
|
||||
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Duration,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
|
||||
|
||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||
|
||||
pub const DEFAULT_LOG_FORMAT: &str = "plain";
|
||||
|
||||
pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
|
||||
|
||||
pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = 1;
|
||||
|
||||
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
|
||||
|
||||
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
|
||||
pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
|
||||
ImageCompressionAlgorithm::Zstd { level: Some(1) };
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
fn default() -> Self {
|
||||
use defaults::*;
|
||||
|
||||
Self {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
availability_zone: (None),
|
||||
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
http_auth_type: (AuthType::Trust),
|
||||
pg_auth_type: (AuthType::Trust),
|
||||
auth_validation_public_key_path: (None),
|
||||
remote_storage: None,
|
||||
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint")),
|
||||
broker_keepalive_interval: (humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default keepalive interval")),
|
||||
log_format: (LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
|
||||
|
||||
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
|
||||
.expect("Invalid default constant")),
|
||||
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
|
||||
metric_collection_interval: (humantime::parse_duration(
|
||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default metric collection interval")),
|
||||
synthetic_size_calculation_interval: (humantime::parse_duration(
|
||||
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default synthetic size calculation interval")),
|
||||
metric_collection_endpoint: (DEFAULT_METRIC_COLLECTION_ENDPOINT),
|
||||
|
||||
metric_collection_bucket: (None),
|
||||
|
||||
disk_usage_based_eviction: (None),
|
||||
|
||||
test_remote_failures: (0),
|
||||
|
||||
ondemand_download_behavior_treat_error_as_warn: (false),
|
||||
|
||||
background_task_maximum_delay: (humantime::parse_duration(
|
||||
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
|
||||
)
|
||||
.unwrap()),
|
||||
|
||||
control_plane_api: (None),
|
||||
control_plane_api_token: (None),
|
||||
control_plane_emergency_mode: (false),
|
||||
|
||||
heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
|
||||
secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: (DEFAULT_INGEST_BATCH_SIZE),
|
||||
|
||||
virtual_file_io_engine: None,
|
||||
|
||||
max_vectored_read_bytes: (MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: (DEFAULT_IMAGE_COMPRESSION),
|
||||
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: None,
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
|
||||
|
||||
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod tenant_conf_defaults {
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// FIXME the below configs are only used by legacy algorithm. The new algorithm
|
||||
// has different parameters.
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
|
||||
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
|
||||
crate::models::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
// If there's a need to decrease this value, first make sure that GC
|
||||
// doesn't hold a layer map write lock for non-trivial operations.
|
||||
// Relevant: https://github.com/neondatabase/neon/issues/3394
|
||||
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
|
||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
|
||||
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
impl Default for TenantConfigToml {
|
||||
fn default() -> Self {
|
||||
use tenant_conf_defaults::*;
|
||||
Self {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
|
||||
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
|
||||
kind: DEFAULT_COMPACTION_ALGORITHM,
|
||||
},
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||
)
|
||||
.expect("cannot parse default walreceiver connect timeout"),
|
||||
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
eviction_policy: crate::models::EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::models::ThrottleConfig::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: crate::models::AuxFilePolicy::default_tenant_config(),
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ pub use utilization::PageserverUtilization;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Display,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
@@ -435,7 +436,9 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
|
||||
)]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
// Disabled for writes, support decompressing during read path
|
||||
Disabled,
|
||||
@@ -470,11 +473,33 @@ impl FromStr for ImageCompressionAlgorithm {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ImageCompressionAlgorithm {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
|
||||
ImageCompressionAlgorithm::Zstd { level } => {
|
||||
if let Some(level) = level {
|
||||
write!(f, "zstd({})", level)
|
||||
} else {
|
||||
write!(f, "zstd")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct EvictionPolicyLayerAccessThreshold {
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -1656,21 +1681,33 @@ mod tests {
|
||||
#[test]
|
||||
fn test_image_compression_algorithm_parsing() {
|
||||
use ImageCompressionAlgorithm::*;
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
|
||||
Disabled
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
|
||||
Zstd { level: None }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
|
||||
Zstd { level: Some(18) }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
|
||||
Zstd { level: Some(-3) }
|
||||
);
|
||||
let cases = [
|
||||
("disabled", Disabled),
|
||||
("zstd", Zstd { level: None }),
|
||||
("zstd(18)", Zstd { level: Some(18) }),
|
||||
("zstd(-3)", Zstd { level: Some(-3) }),
|
||||
];
|
||||
|
||||
for (display, expected) in cases {
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str(display).unwrap(),
|
||||
expected,
|
||||
"parsing works"
|
||||
);
|
||||
assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
|
||||
|
||||
let ser = serde_json::to_string(&expected).expect("serialization");
|
||||
assert_eq!(
|
||||
serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
|
||||
expected,
|
||||
"serde roundtrip"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::Value::String(display.to_string()),
|
||||
serde_json::to_value(expected).unwrap(),
|
||||
"Display is the serde serialization"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,6 +235,31 @@ timeout = '5s'";
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_storage_class_serde_roundtrip() {
|
||||
let classes = [
|
||||
None,
|
||||
Some(StorageClass::Standard),
|
||||
Some(StorageClass::IntelligentTiering),
|
||||
];
|
||||
for class in classes {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct Wrapper {
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_storage_class",
|
||||
serialize_with = "serialize_storage_class"
|
||||
)]
|
||||
class: Option<StorageClass>,
|
||||
}
|
||||
let wrapped = Wrapper {
|
||||
class: class.clone(),
|
||||
};
|
||||
let serialized = serde_json::to_string(&wrapped).unwrap();
|
||||
let deserialized: Wrapper = serde_json::from_str(&serialized).unwrap();
|
||||
assert_eq!(class, deserialized.class);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_azure_parsing() {
|
||||
let toml = "\
|
||||
|
||||
@@ -5,7 +5,9 @@ use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[derive(
|
||||
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
|
||||
)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
Plain,
|
||||
@@ -274,6 +276,14 @@ impl From<String> for SecretString {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for SecretString {
|
||||
type Err = std::convert::Infallible;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(Self(s.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecretString {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "[SECRET]")
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
testing = ["fail/failpoints", "pageserver_api/testing" ]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -101,6 +101,7 @@ procfs.workspace = true
|
||||
criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
|
||||
indoc.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
|
||||
@@ -4,7 +4,7 @@ use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{
|
||||
config::{defaults::DEFAULT_IO_BUFFER_ALIGNMENT, PageServerConf},
|
||||
config::PageServerConf,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
l0_flush::{L0FlushConfig, L0FlushGlobalState},
|
||||
page_cache,
|
||||
@@ -167,7 +167,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
virtual_file::init(
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
@@ -148,7 +147,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Subcommand;
|
||||
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
@@ -194,7 +193,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -20,14 +20,13 @@ use clap::{Parser, Subcommand};
|
||||
use index_part::IndexPartCmd;
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use std::env;
|
||||
use std::env::{var, VarError};
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -223,27 +224,15 @@ fn initialize_config(
|
||||
}
|
||||
};
|
||||
|
||||
let config: toml_edit::Document = match std::fs::File::open(cfg_file_path) {
|
||||
Ok(mut f) => {
|
||||
let md = f.metadata().context("stat config file")?;
|
||||
if md.is_file() {
|
||||
let mut s = String::new();
|
||||
f.read_to_string(&mut s).context("read config file")?;
|
||||
s.parse().context("parse config file toml")?
|
||||
} else {
|
||||
anyhow::bail!("directory entry exists but is not a file: {cfg_file_path}");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("open pageserver config: {e}: {cfg_file_path}");
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Using pageserver toml: {config}");
|
||||
|
||||
// Construct the runtime representation
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, &config, workdir)
|
||||
.context("Failed to parse pageserver configuration")?;
|
||||
let config_file_contents =
|
||||
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
|
||||
let config_toml = serde_path_to_error::deserialize(
|
||||
toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?,
|
||||
)
|
||||
.context("deserialize config toml")?;
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
|
||||
.context("runtime-validation of config toml")?;
|
||||
|
||||
Ok(Box::leak(Box::new(conf)))
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -41,19 +41,15 @@
|
||||
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
|
||||
// reading these fields. We use the Debug impl for semi-structured logging, though.
|
||||
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{config::DiskUsageEvictionTaskConfig, shard::TenantShardId};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::serde_percent::Percent;
|
||||
use utils::{completion, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
@@ -69,23 +65,9 @@ use crate::{
|
||||
CancellableTask, DiskUsageEvictionTask,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub period: Duration,
|
||||
#[cfg(feature = "testing")]
|
||||
pub mock_statvfs: Option<crate::statvfs::mock::Behavior>,
|
||||
/// Select sorting for evicted layers
|
||||
#[serde(default)]
|
||||
pub eviction_order: EvictionOrder,
|
||||
}
|
||||
|
||||
/// Selects the sort order for eviction candidates *after* per tenant `min_resident_size`
|
||||
/// partitioning.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum EvictionOrder {
|
||||
/// Order the layers to be evicted by how recently they have been accessed relatively within
|
||||
/// the set of resident layers of a tenant.
|
||||
@@ -96,23 +78,22 @@ pub enum EvictionOrder {
|
||||
/// we read tenants is deterministic. If we find the need to use this as `false`, we need
|
||||
/// to ensure nondeterminism by adding in a random number to break the
|
||||
/// `relative_last_activity==0.0` ties.
|
||||
#[serde(default = "default_highest_layer_count_loses_first")]
|
||||
highest_layer_count_loses_first: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for EvictionOrder {
|
||||
fn default() -> Self {
|
||||
Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
impl From<pageserver_api::config::EvictionOrder> for EvictionOrder {
|
||||
fn from(value: pageserver_api::config::EvictionOrder) -> Self {
|
||||
match value {
|
||||
pageserver_api::config::EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first,
|
||||
} => Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_highest_layer_count_loses_first() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl EvictionOrder {
|
||||
fn sort(&self, candidates: &mut [(EvictionPartition, EvictionCandidate)]) {
|
||||
use EvictionOrder::*;
|
||||
@@ -295,7 +276,7 @@ async fn disk_usage_eviction_task_iteration(
|
||||
storage,
|
||||
usage_pre,
|
||||
tenant_manager,
|
||||
task_config.eviction_order,
|
||||
task_config.eviction_order.into(),
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
@@ -1257,7 +1238,6 @@ mod filesystem_level_usage {
|
||||
|
||||
#[test]
|
||||
fn max_usage_pct_pressure() {
|
||||
use super::EvictionOrder;
|
||||
use super::Usage as _;
|
||||
use std::time::Duration;
|
||||
use utils::serde_percent::Percent;
|
||||
@@ -1269,7 +1249,7 @@ mod filesystem_level_usage {
|
||||
period: Duration::MAX,
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
eviction_order: EvictionOrder::default(),
|
||||
eviction_order: pageserver_api::config::EvictionOrder::default(),
|
||||
},
|
||||
total_bytes: 100_000,
|
||||
avail_bytes: 0,
|
||||
|
||||
@@ -2076,7 +2076,7 @@ async fn disk_usage_eviction_run(
|
||||
evict_bytes: u64,
|
||||
|
||||
#[serde(default)]
|
||||
eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
|
||||
eviction_order: pageserver_api::config::EvictionOrder,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, serde::Serialize)]
|
||||
@@ -2112,7 +2112,7 @@ async fn disk_usage_eviction_run(
|
||||
&state.remote_storage,
|
||||
usage,
|
||||
&state.tenant_manager,
|
||||
config.eviction_order,
|
||||
config.eviction_order.into(),
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
@@ -16,6 +14,16 @@ impl Default for L0FlushConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<pageserver_api::models::L0FlushConfig> for L0FlushConfig {
|
||||
fn from(config: pageserver_api::models::L0FlushConfig) -> Self {
|
||||
match config {
|
||||
pageserver_api::models::L0FlushConfig::Direct { max_concurrency } => {
|
||||
Self::Direct { max_concurrency }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||
|
||||
|
||||
@@ -60,32 +60,7 @@ pub mod mock {
|
||||
use regex::Regex;
|
||||
use tracing::log::info;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Behavior {
|
||||
Success {
|
||||
blocksize: u64,
|
||||
total_blocks: u64,
|
||||
name_filter: Option<utils::serde_regex::Regex>,
|
||||
},
|
||||
Failure {
|
||||
mocked_error: MockedError,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum MockedError {
|
||||
EIO,
|
||||
}
|
||||
|
||||
impl From<MockedError> for nix::Error {
|
||||
fn from(e: MockedError) -> Self {
|
||||
match e {
|
||||
MockedError::EIO => nix::Error::EIO,
|
||||
}
|
||||
}
|
||||
}
|
||||
pub use pageserver_api::config::statvfs::mock::Behavior;
|
||||
|
||||
pub fn get(tenants_dir: &Utf8Path, behavior: &Behavior) -> nix::Result<Statvfs> {
|
||||
info!("running mocked statvfs");
|
||||
@@ -116,6 +91,7 @@ pub mod mock {
|
||||
block_size: *blocksize,
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Behavior::Failure { mocked_error } => Err((*mocked_error).into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,10 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
pub(crate) use pageserver_api::config::TenantConfigToml as TenantConf;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::CompactionAlgorithm;
|
||||
use pageserver_api::models::CompactionAlgorithmSettings;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::{self, ThrottleConfig};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
@@ -23,50 +22,6 @@ use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
use utils::generation::Generation;
|
||||
|
||||
pub mod defaults {
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// FIXME the below configs are only used by legacy algorithm. The new algorithm
|
||||
// has different parameters.
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
|
||||
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
||||
super::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
// If there's a need to decrease this value, first make sure that GC
|
||||
// doesn't hold a layer map write lock for non-trivial operations.
|
||||
// Relevant: https://github.com/neondatabase/neon/issues/3394
|
||||
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
|
||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
|
||||
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) enum AttachmentMode {
|
||||
/// Our generation is current as far as we know, and as far as we know we are the only attached
|
||||
@@ -281,96 +236,20 @@ impl LocationConf {
|
||||
}
|
||||
}
|
||||
|
||||
/// A tenant's calcuated configuration, which is the result of merging a
|
||||
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
|
||||
///
|
||||
/// For storing and transmitting individual tenant's configuration, see
|
||||
/// TenantConfOpt.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
// How often to check if there's compaction work to be done.
|
||||
// Duration::ZERO means automatic compaction is disabled.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Duration,
|
||||
// Level0 delta layer threshold for compaction.
|
||||
pub compaction_threshold: usize,
|
||||
pub compaction_algorithm: CompactionAlgorithmSettings,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is #of bytes of WAL.
|
||||
// Page versions older than this are garbage collected away.
|
||||
pub gc_horizon: u64,
|
||||
// Interval at which garbage collection is triggered.
|
||||
// Duration::ZERO means automatic GC is disabled
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
// Page versions older than this are garbage collected away.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Duration,
|
||||
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Duration,
|
||||
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
|
||||
/// A stalled safekeeper will be changed to a newer one when it appears.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Duration,
|
||||
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
|
||||
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
|
||||
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: AuxFilePolicy,
|
||||
|
||||
/// The length for an explicit LSN lease request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Duration,
|
||||
|
||||
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Duration,
|
||||
impl Default for LocationConf {
|
||||
// TODO: this should be removed once tenant loading can guarantee that we are never
|
||||
// loading from a directory without a configuration.
|
||||
// => tech debt since https://github.com/neondatabase/neon/issues/1555
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: Generation::none(),
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
tenant_conf: TenantConfOpt::default(),
|
||||
shard: ShardIdentity::unsharded(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -545,51 +424,6 @@ impl TenantConfOpt {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TenantConf {
|
||||
fn default() -> Self {
|
||||
use defaults::*;
|
||||
Self {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
|
||||
compaction_algorithm: CompactionAlgorithmSettings {
|
||||
kind: DEFAULT_COMPACTION_ALGORITHM,
|
||||
},
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||
)
|
||||
.expect("cannot parse default walreceiver connect timeout"),
|
||||
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -52,6 +52,7 @@ use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
@@ -34,8 +34,7 @@ use crate::tenant::disk_btree::{
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
@@ -46,6 +45,7 @@ use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
|
||||
@@ -215,7 +215,7 @@ impl IndexEntry {
|
||||
|
||||
const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
|
||||
let res = Self::validate_checkpoint_distance(
|
||||
crate::tenant::config::defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
);
|
||||
if res.is_err() {
|
||||
panic!("default checkpoint distance is valid")
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
@@ -456,9 +455,11 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
|
||||
|
||||
// If compaction period is set to zero (to disable it), then we will use a reasonable default
|
||||
let period = if period == Duration::ZERO {
|
||||
humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
|
||||
.unwrap()
|
||||
.into()
|
||||
humantime::Duration::from_str(
|
||||
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
|
||||
)
|
||||
.unwrap()
|
||||
.into()
|
||||
} else {
|
||||
period
|
||||
};
|
||||
|
||||
@@ -66,7 +66,6 @@ use std::{
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
config::defaults::DEFAULT_PITR_INTERVAL,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
@@ -102,6 +101,7 @@ use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
|
||||
@@ -19,6 +19,7 @@ use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::{CompactL0BypassPageCacheValidation, CompactL0Phase1ValueAccess};
|
||||
use pageserver_api::key::KEY_SIZE;
|
||||
use pageserver_api::keyspace::ShardedRange;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
@@ -29,7 +30,6 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::split_writer::{
|
||||
@@ -43,6 +43,9 @@ use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::DeltaLayer;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use pageserver_api::config::tenant_conf_defaults::{
|
||||
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
||||
};
|
||||
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::repository::{Key, Value};
|
||||
@@ -1433,43 +1436,6 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum CompactL0Phase1ValueAccess {
|
||||
/// The old way.
|
||||
PageCachedBlobIo,
|
||||
/// The new way.
|
||||
StreamingKmerge {
|
||||
/// If set, we run both the old way and the new way, validate that
|
||||
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
|
||||
/// and if the validation fails,
|
||||
/// - in tests: fail them with a panic or
|
||||
/// - in prod, log a rate-limited warning and use the old way's results.
|
||||
///
|
||||
/// If not set, we only run the new way and trust its results.
|
||||
validate: Option<CompactL0BypassPageCacheValidation>,
|
||||
},
|
||||
}
|
||||
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
fn default() -> Self {
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
|
||||
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Entry point for new tiered compaction algorithm.
|
||||
///
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
@@ -29,9 +28,6 @@ use crate::context::RequestContext;
|
||||
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
/// Metadata bundled with the start and end offset of a blob.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct BlobMeta {
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
//! This is similar to PostgreSQL's virtual file descriptor facility in
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
@@ -19,6 +18,7 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
|
||||
@@ -84,9 +84,14 @@ pub(crate) fn get() -> IoEngine {
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
IoEngineKind::TokioEpollUring
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
IoEngineKind::StdFs
|
||||
}
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
|
||||
@@ -24,7 +24,20 @@ from functools import cached_property, partial
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union, cast
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
import asyncpg
|
||||
@@ -90,6 +103,8 @@ from fixtures.utils import AuxFileStore as AuxFileStore # reexport
|
||||
|
||||
from .neon_api import NeonAPI, NeonApiEndpoint
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
"""
|
||||
This file contains pytest fixtures. A fixture is a test resource that can be
|
||||
summoned by placing its name in the test's arguments.
|
||||
@@ -2986,16 +3001,17 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
def config_toml_path(self) -> Path:
|
||||
return self.workdir / "pageserver.toml"
|
||||
|
||||
def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], None]):
|
||||
def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], T]) -> T:
|
||||
"""
|
||||
Edit the pageserver's config toml file in place.
|
||||
"""
|
||||
path = self.config_toml_path
|
||||
with open(path, "r") as f:
|
||||
config = toml.load(f)
|
||||
edit_fn(config)
|
||||
res = edit_fn(config)
|
||||
with open(path, "w") as f:
|
||||
toml.dump(config, f)
|
||||
return res
|
||||
|
||||
def patch_config_toml_nonrecursive(self, patch: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
@@ -142,11 +142,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
replaced_config = env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": "",
|
||||
}
|
||||
)
|
||||
def remove_control_plane_api_field(config):
|
||||
return config.pop("control_plane_api")
|
||||
|
||||
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
|
||||
env.pageserver.start()
|
||||
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
|
||||
|
||||
@@ -179,7 +178,11 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env.pageserver.stop()
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.patch_config_toml_nonrecursive(replaced_config)
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": control_plane_api,
|
||||
}
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
@@ -733,7 +733,7 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# We will run with the limit set to 1, so that once we have one tenant stuck
|
||||
# in a pausable failpoint, the rest are prevented from proceeding through warmup.
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
@@ -984,7 +984,7 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -1062,7 +1062,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("activation_method", ["endpoint", "branch", "delete"])
|
||||
def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_method: str):
|
||||
# env.initial_tenant will take up this permit when attaching with lazy because of a failpoint activated after restart
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user