mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
9 Commits
joonas/syn
...
run-clippy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae50e9600f | ||
|
|
cd613f5ab3 | ||
|
|
539b3ad541 | ||
|
|
79fa640058 | ||
|
|
c41f9870a5 | ||
|
|
a0e923b70b | ||
|
|
38f6107534 | ||
|
|
f25e07893c | ||
|
|
460d48437b |
27
.github/workflows/build_and_test.yml
vendored
27
.github/workflows/build_and_test.yml
vendored
@@ -122,10 +122,12 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
# There's no reason to expect clippy or code formatting to be different on different platforms,
|
||||
# so it's enough to run these on x64 only.
|
||||
strategy:
|
||||
matrix:
|
||||
arch: [ x64, arm64 ]
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
@@ -166,15 +168,25 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
|
||||
|
||||
- name: Run cargo clippy (debug)
|
||||
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
- name: Run cargo clippy (release)
|
||||
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
|
||||
run: |
|
||||
parallel --jobs 8 "cargo hack --feature-powerset --partition {}/8 clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS" ::: 1 2 3 4 5 6 7 8
|
||||
|
||||
# instead of running the full release build, running debug build again,
|
||||
# but with disabled `debug-assertions` to excersice release code paths
|
||||
- name: Run cargo clippy (debug, with debug-assertions=false)
|
||||
run: |
|
||||
for N in 4 8 10 12 14 16 18 20; do
|
||||
echo "Running clippy with debug-assertions=false for partition ${N}"
|
||||
time parallel --jobs ${N} "cargo hack --feature-powerset --partition {}/${N} clippy --target-dir target/partition-{} $CLIPPY_COMMON_ARGS -C debug-assertions=off" ::: $(seq -s " " 1 ${N})
|
||||
rm -rf target/partition-*
|
||||
done
|
||||
|
||||
- name: Check documentation generation
|
||||
run: cargo doc --workspace --no-deps --document-private-items
|
||||
env:
|
||||
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
|
||||
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
|
||||
|
||||
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
|
||||
- name: Check formatting
|
||||
@@ -271,10 +283,6 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: sync(1)
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: sync
|
||||
|
||||
- name: Pytest benchmarks
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
@@ -290,7 +298,6 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
SYNC_AFTER_EACH_TEST: true
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
48
Cargo.lock
generated
48
Cargo.lock
generated
@@ -915,22 +915,25 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.70.1"
|
||||
version = "0.65.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
|
||||
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"bitflags 1.3.2",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.12.1",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"log",
|
||||
"prettyplease 0.2.17",
|
||||
"peeking_take_while",
|
||||
"prettyplease 0.2.6",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex",
|
||||
"syn 2.0.52",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2724,12 +2727,6 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indoc"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
|
||||
|
||||
[[package]]
|
||||
name = "infer"
|
||||
version = "0.2.3"
|
||||
@@ -2946,6 +2943,12 @@ dependencies = [
|
||||
"spin 0.5.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.150"
|
||||
@@ -3698,7 +3701,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"indoc",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
@@ -3764,7 +3766,6 @@ dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
@@ -3772,16 +3773,11 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"itertools 0.10.5",
|
||||
"nix 0.27.1",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"storage_broker",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
@@ -3968,6 +3964,12 @@ dependencies = [
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "peeking_take_while"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "3.0.3"
|
||||
@@ -4265,9 +4267,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "prettyplease"
|
||||
version = "0.2.17"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
|
||||
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 2.0.52",
|
||||
@@ -6079,9 +6081,8 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
|
||||
|
||||
[[package]]
|
||||
name = "svg_fmt"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
|
||||
version = "0.4.2"
|
||||
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
@@ -7613,7 +7614,6 @@ dependencies = [
|
||||
"hyper 0.14.26",
|
||||
"indexmap 1.9.3",
|
||||
"itertools 0.10.5",
|
||||
"itertools 0.12.1",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
|
||||
@@ -64,7 +64,7 @@ aws-types = "1.2.0"
|
||||
axum = { version = "0.6.20", features = ["ws"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.70"
|
||||
bindgen = "0.65"
|
||||
bit_field = "0.10.2"
|
||||
bstr = "1.0"
|
||||
byteorder = "1.4"
|
||||
@@ -103,7 +103,6 @@ humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
tokio-tungstenite = "0.20.0"
|
||||
indexmap = "2"
|
||||
indoc = "2"
|
||||
inotify = "0.10.2"
|
||||
ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
@@ -161,7 +160,8 @@ socket2 = "0.5"
|
||||
strum = "0.24"
|
||||
strum_macros = "0.24"
|
||||
"subtle" = "2.5.0"
|
||||
svg_fmt = "0.4.3"
|
||||
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
|
||||
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
|
||||
sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
task-local-extensions = "0.1.4"
|
||||
|
||||
@@ -87,7 +87,6 @@ RUN mkdir -p /data/.neon/ && \
|
||||
"pg_distrib_dir='/usr/local/'\n" \
|
||||
"listen_pg_addr='0.0.0.0:6400'\n" \
|
||||
"listen_http_addr='0.0.0.0:9898'\n" \
|
||||
"availability_zone='local'\n" \
|
||||
> /data/.neon/pageserver.toml && \
|
||||
chown -R neon:neon /data/.neon
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.81.0
|
||||
ENV RUSTC_VERSION=1.80.1
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -64,12 +64,6 @@ brew install protobuf openssl flex bison icu4c pkg-config
|
||||
echo 'export PATH="$(brew --prefix openssl)/bin:$PATH"' >> ~/.zshrc
|
||||
```
|
||||
|
||||
If you get errors about missing `m4` you may have to install it manually:
|
||||
```
|
||||
brew install m4
|
||||
brew link --force m4
|
||||
```
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
```
|
||||
# recommended approach from https://www.rust-lang.org/tools/install
|
||||
|
||||
@@ -336,7 +336,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
availability_zone_id,
|
||||
availability_zone_id: Some(availability_zone_id),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -4,10 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
# See pageserver/Cargo.toml
|
||||
testing = ["dep:nix"]
|
||||
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
@@ -27,12 +23,6 @@ thiserror.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
itertools.workspace = true
|
||||
storage_broker.workspace = true
|
||||
camino = {workspace = true, features = ["serde1"]}
|
||||
remote_storage.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
nix = {workspace = true, optional = true}
|
||||
reqwest.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
|
||||
@@ -1,28 +1,15 @@
|
||||
use camino::Utf8PathBuf;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use const_format::formatcp;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use const_format::formatcp;
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use serde_with::serde_as;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
|
||||
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
|
||||
// as a separate structure. This information is not neeed by the pageserver
|
||||
// itself, it is only used for registering the pageserver with the control
|
||||
@@ -42,476 +29,3 @@ pub struct NodeMetadata {
|
||||
#[serde(flatten)]
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
///
|
||||
/// We use serde derive with `#[serde(default)]` to generate a deserializer
|
||||
/// that fills in the default values for each config field.
|
||||
///
|
||||
/// If there cannot be a static default value because we need to make runtime
|
||||
/// checks to determine the default, make it an `Option` (which defaults to None).
|
||||
/// The runtime check should be done in the consuming crate, i.e., `pageserver`.
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct ConfigToml {
|
||||
// types mapped 1:1 into the runtime PageServerConfig type
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub availability_zone: Option<String>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wait_lsn_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub pg_auth_type: AuthType,
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub tenant_config: TenantConfigToml,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub broker_endpoint: storage_broker::Uri,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub broker_keepalive_interval: Duration,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub log_format: LogFormat,
|
||||
pub concurrent_tenant_warmup: NonZeroUsize,
|
||||
pub concurrent_tenant_size_logical_size_queries: NonZeroUsize,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub metric_collection_interval: Duration,
|
||||
pub metric_collection_endpoint: Option<reqwest::Url>,
|
||||
pub metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
pub test_remote_failures: u64,
|
||||
pub ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub background_task_maximum_delay: Duration,
|
||||
pub control_plane_api: Option<reqwest::Url>,
|
||||
pub control_plane_api_token: Option<String>,
|
||||
pub control_plane_emergency_mode: bool,
|
||||
pub heatmap_upload_concurrency: usize,
|
||||
pub secondary_download_concurrency: usize,
|
||||
pub virtual_file_io_engine: Option<crate::models::virtual_file::IoEngineKind>,
|
||||
pub ingest_batch_size: u64,
|
||||
pub max_vectored_read_bytes: MaxVectoredReadBytes,
|
||||
pub image_compression: ImageCompressionAlgorithm,
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
pub l0_flush: Option<crate::models::L0FlushConfig>,
|
||||
#[serde(skip_serializing)]
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
|
||||
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
|
||||
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
|
||||
pub io_buffer_alignment: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: utils::serde_percent::Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub period: Duration,
|
||||
#[cfg(feature = "testing")]
|
||||
pub mock_statvfs: Option<statvfs::mock::Behavior>,
|
||||
/// Select sorting for evicted layers
|
||||
#[serde(default)]
|
||||
pub eviction_order: EvictionOrder,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Behavior {
|
||||
Success {
|
||||
blocksize: u64,
|
||||
total_blocks: u64,
|
||||
name_filter: Option<utils::serde_regex::Regex>,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Failure { mocked_error: MockedError },
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum MockedError {
|
||||
EIO,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
impl From<MockedError> for nix::Error {
|
||||
fn from(e: MockedError) -> Self {
|
||||
match e {
|
||||
MockedError::EIO => nix::Error::EIO,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
pub enum EvictionOrder {
|
||||
RelativeAccessed {
|
||||
highest_layer_count_loses_first: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for EvictionOrder {
|
||||
fn default() -> Self {
|
||||
Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetVectoredImpl {
|
||||
Sequential,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetImpl {
|
||||
Legacy,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
/// A tenant's calcuated configuration, which is the result of merging a
|
||||
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
|
||||
///
|
||||
/// For storing and transmitting individual tenant's configuration, see
|
||||
/// TenantConfOpt.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(deny_unknown_fields, default)]
|
||||
pub struct TenantConfigToml {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
// How often to check if there's compaction work to be done.
|
||||
// Duration::ZERO means automatic compaction is disabled.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Duration,
|
||||
// Level0 delta layer threshold for compaction.
|
||||
pub compaction_threshold: usize,
|
||||
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is #of bytes of WAL.
|
||||
// Page versions older than this are garbage collected away.
|
||||
pub gc_horizon: u64,
|
||||
// Interval at which garbage collection is triggered.
|
||||
// Duration::ZERO means automatic GC is disabled
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
// Page versions older than this are garbage collected away.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Duration,
|
||||
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Duration,
|
||||
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
|
||||
/// A stalled safekeeper will be changed to a newer one when it appears.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Duration,
|
||||
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub eviction_policy: crate::models::EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: crate::models::ThrottleConfig,
|
||||
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
|
||||
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: crate::models::AuxFilePolicy,
|
||||
|
||||
/// The length for an explicit LSN lease request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Duration,
|
||||
|
||||
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Duration,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
|
||||
|
||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||
|
||||
pub const DEFAULT_LOG_FORMAT: &str = "plain";
|
||||
|
||||
pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
|
||||
|
||||
pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = 1;
|
||||
|
||||
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
|
||||
|
||||
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
|
||||
pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
|
||||
ImageCompressionAlgorithm::Zstd { level: Some(1) };
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
fn default() -> Self {
|
||||
use defaults::*;
|
||||
|
||||
Self {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
availability_zone: (None),
|
||||
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
http_auth_type: (AuthType::Trust),
|
||||
pg_auth_type: (AuthType::Trust),
|
||||
auth_validation_public_key_path: (None),
|
||||
remote_storage: None,
|
||||
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint")),
|
||||
broker_keepalive_interval: (humantime::parse_duration(
|
||||
storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default keepalive interval")),
|
||||
log_format: (LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
|
||||
|
||||
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
|
||||
.expect("Invalid default constant")),
|
||||
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
|
||||
metric_collection_interval: (humantime::parse_duration(
|
||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default metric collection interval")),
|
||||
synthetic_size_calculation_interval: (humantime::parse_duration(
|
||||
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
|
||||
)
|
||||
.expect("cannot parse default synthetic size calculation interval")),
|
||||
metric_collection_endpoint: (DEFAULT_METRIC_COLLECTION_ENDPOINT),
|
||||
|
||||
metric_collection_bucket: (None),
|
||||
|
||||
disk_usage_based_eviction: (None),
|
||||
|
||||
test_remote_failures: (0),
|
||||
|
||||
ondemand_download_behavior_treat_error_as_warn: (false),
|
||||
|
||||
background_task_maximum_delay: (humantime::parse_duration(
|
||||
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
|
||||
)
|
||||
.unwrap()),
|
||||
|
||||
control_plane_api: (None),
|
||||
control_plane_api_token: (None),
|
||||
control_plane_emergency_mode: (false),
|
||||
|
||||
heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
|
||||
secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: (DEFAULT_INGEST_BATCH_SIZE),
|
||||
|
||||
virtual_file_io_engine: None,
|
||||
|
||||
max_vectored_read_bytes: (MaxVectoredReadBytes(
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: (DEFAULT_IMAGE_COMPRESSION),
|
||||
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: None,
|
||||
compact_level0_phase1_value_access: Default::default(),
|
||||
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
|
||||
|
||||
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod tenant_conf_defaults {
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// FIXME the below configs are only used by legacy algorithm. The new algorithm
|
||||
// has different parameters.
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
|
||||
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
|
||||
crate::models::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
// If there's a need to decrease this value, first make sure that GC
|
||||
// doesn't hold a layer map write lock for non-trivial operations.
|
||||
// Relevant: https://github.com/neondatabase/neon/issues/3394
|
||||
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
|
||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
|
||||
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
impl Default for TenantConfigToml {
|
||||
fn default() -> Self {
|
||||
use tenant_conf_defaults::*;
|
||||
Self {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
|
||||
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
|
||||
kind: DEFAULT_COMPACTION_ALGORITHM,
|
||||
},
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||
)
|
||||
.expect("cannot parse default walreceiver connect timeout"),
|
||||
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
eviction_policy: crate::models::EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::models::ThrottleConfig::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: crate::models::AuxFilePolicy::default_tenant_config(),
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ pub struct NodeRegisterRequest {
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
|
||||
pub availability_zone_id: String,
|
||||
pub availability_zone_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -6,7 +6,6 @@ pub use utilization::PageserverUtilization;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Display,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
@@ -436,9 +435,7 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
// Disabled for writes, support decompressing during read path
|
||||
Disabled,
|
||||
@@ -473,33 +470,11 @@ impl FromStr for ImageCompressionAlgorithm {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ImageCompressionAlgorithm {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
|
||||
ImageCompressionAlgorithm::Zstd { level } => {
|
||||
if let Some(level) = level {
|
||||
write!(f, "zstd({})", level)
|
||||
} else {
|
||||
write!(f, "zstd")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct EvictionPolicyLayerAccessThreshold {
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -1681,33 +1656,21 @@ mod tests {
|
||||
#[test]
|
||||
fn test_image_compression_algorithm_parsing() {
|
||||
use ImageCompressionAlgorithm::*;
|
||||
let cases = [
|
||||
("disabled", Disabled),
|
||||
("zstd", Zstd { level: None }),
|
||||
("zstd(18)", Zstd { level: Some(18) }),
|
||||
("zstd(-3)", Zstd { level: Some(-3) }),
|
||||
];
|
||||
|
||||
for (display, expected) in cases {
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str(display).unwrap(),
|
||||
expected,
|
||||
"parsing works"
|
||||
);
|
||||
assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
|
||||
|
||||
let ser = serde_json::to_string(&expected).expect("serialization");
|
||||
assert_eq!(
|
||||
serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
|
||||
expected,
|
||||
"serde roundtrip"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::Value::String(display.to_string()),
|
||||
serde_json::to_value(expected).unwrap(),
|
||||
"Display is the serde serialization"
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
|
||||
Disabled
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
|
||||
Zstd { level: None }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
|
||||
Zstd { level: Some(18) }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
|
||||
Zstd { level: Some(-3) }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
|
||||
fn include_file(&self, filename: &str) {
|
||||
// This does the equivalent of passing bindgen::CargoCallbacks
|
||||
// to the builder .parse_callbacks() method.
|
||||
let cargo_callbacks = bindgen::CargoCallbacks::new();
|
||||
let cargo_callbacks = bindgen::CargoCallbacks;
|
||||
cargo_callbacks.include_file(filename)
|
||||
}
|
||||
|
||||
|
||||
@@ -235,31 +235,6 @@ timeout = '5s'";
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_storage_class_serde_roundtrip() {
|
||||
let classes = [
|
||||
None,
|
||||
Some(StorageClass::Standard),
|
||||
Some(StorageClass::IntelligentTiering),
|
||||
];
|
||||
for class in classes {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct Wrapper {
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_storage_class",
|
||||
serialize_with = "serialize_storage_class"
|
||||
)]
|
||||
class: Option<StorageClass>,
|
||||
}
|
||||
let wrapped = Wrapper {
|
||||
class: class.clone(),
|
||||
};
|
||||
let serialized = serde_json::to_string(&wrapped).unwrap();
|
||||
let deserialized: Wrapper = serde_json::from_str(&serialized).unwrap();
|
||||
assert_eq!(class, deserialized.class);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_azure_parsing() {
|
||||
let toml = "\
|
||||
|
||||
@@ -5,9 +5,7 @@ use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
#[derive(
|
||||
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
|
||||
)]
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
Plain,
|
||||
@@ -276,14 +274,6 @@ impl From<String> for SecretString {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for SecretString {
|
||||
type Err = std::convert::Infallible;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(Self(s.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecretString {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "[SECRET]")
|
||||
|
||||
@@ -23,7 +23,7 @@ where
|
||||
for (i, item) in iter.enumerate() {
|
||||
visitor(item);
|
||||
|
||||
if (i + 1) % interval == 0 {
|
||||
if i + 1 % interval == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
if cancel.is_cancelled() {
|
||||
return Err(YieldingLoopError::Cancelled);
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use std::{env, path::PathBuf, process::Command};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bindgen::CargoCallbacks;
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// Tell cargo to invalidate the built crate whenever the wrapper changes
|
||||
@@ -63,25 +64,16 @@ fn main() -> anyhow::Result<()> {
|
||||
.map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
|
||||
};
|
||||
|
||||
let unwind_abi_functions = [
|
||||
"log_internal",
|
||||
"recovery_download",
|
||||
"start_streaming",
|
||||
"finish_sync_safekeepers",
|
||||
"wait_event_set",
|
||||
"WalProposerStart",
|
||||
];
|
||||
|
||||
// The bindgen::Builder is the main entry point
|
||||
// to bindgen, and lets you build up options for
|
||||
// the resulting bindings.
|
||||
let mut builder = bindgen::Builder::default()
|
||||
let bindings = bindgen::Builder::default()
|
||||
// The input header we would like to generate
|
||||
// bindings for.
|
||||
.header("bindgen_deps.h")
|
||||
// Tell cargo to invalidate the built crate whenever any of the
|
||||
// included header files changed.
|
||||
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
|
||||
.parse_callbacks(Box::new(CargoCallbacks))
|
||||
.allowlist_type("WalProposer")
|
||||
.allowlist_type("WalProposerConfig")
|
||||
.allowlist_type("walproposer_api")
|
||||
@@ -113,12 +105,7 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_var("WL_SOCKET_MASK")
|
||||
.clang_arg("-DWALPROPOSER_LIB")
|
||||
.clang_arg(format!("-I{pgxn_neon}"))
|
||||
.clang_arg(format!("-I{inc_server_path}"));
|
||||
|
||||
for name in unwind_abi_functions {
|
||||
builder = builder.override_abi(bindgen::Abi::CUnwind, name);
|
||||
}
|
||||
let bindings = builder
|
||||
.clang_arg(format!("-I{inc_server_path}"))
|
||||
// Finish the builder and generate the bindings.
|
||||
.generate()
|
||||
// Unwrap the Result and panic on failure.
|
||||
|
||||
@@ -33,7 +33,7 @@ extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemStat
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
|
||||
extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
@@ -187,7 +187,7 @@ extern "C" fn conn_blocking_write(
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
|
||||
extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
|
||||
unsafe {
|
||||
let callback_data = (*(*(*sk).wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
@@ -272,7 +272,7 @@ extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn wait_event_set(
|
||||
extern "C" fn wait_event_set(
|
||||
wp: *mut WalProposer,
|
||||
timeout: ::std::os::raw::c_long,
|
||||
event_sk: *mut *mut Safekeeper,
|
||||
@@ -324,7 +324,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
|
||||
extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
@@ -340,7 +340,7 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, sk: *mut Safekee
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn log_internal(
|
||||
extern "C" fn log_internal(
|
||||
wp: *mut WalProposer,
|
||||
level: ::std::os::raw::c_int,
|
||||
line: *const ::std::os::raw::c_char,
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing" ]
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -101,7 +101,6 @@ procfs.workspace = true
|
||||
criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
|
||||
indoc.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
|
||||
@@ -4,7 +4,7 @@ use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{
|
||||
config::PageServerConf,
|
||||
config::{defaults::DEFAULT_IO_BUFFER_ALIGNMENT, PageServerConf},
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
l0_flush::{L0FlushConfig, L0FlushGlobalState},
|
||||
page_cache,
|
||||
@@ -167,7 +167,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
virtual_file::init(
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
@@ -147,7 +148,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Subcommand;
|
||||
use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
@@ -193,7 +194,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
|
||||
@@ -20,13 +20,14 @@ use clap::{Parser, Subcommand};
|
||||
use index_part::IndexPartCmd;
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::ControlFileData;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
use std::env;
|
||||
use std::env::{var, VarError};
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -125,6 +124,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
|
||||
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
|
||||
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
@@ -223,15 +223,27 @@ fn initialize_config(
|
||||
}
|
||||
};
|
||||
|
||||
let config_file_contents =
|
||||
std::fs::read_to_string(cfg_file_path).context("read config file from filesystem")?;
|
||||
let config_toml = serde_path_to_error::deserialize(
|
||||
toml_edit::de::Deserializer::from_str(&config_file_contents)
|
||||
.context("build toml deserializer")?,
|
||||
)
|
||||
.context("deserialize config toml")?;
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
|
||||
.context("runtime-validation of config toml")?;
|
||||
let config: toml_edit::Document = match std::fs::File::open(cfg_file_path) {
|
||||
Ok(mut f) => {
|
||||
let md = f.metadata().context("stat config file")?;
|
||||
if md.is_file() {
|
||||
let mut s = String::new();
|
||||
f.read_to_string(&mut s).context("read config file")?;
|
||||
s.parse().context("parse config file toml")?
|
||||
} else {
|
||||
anyhow::bail!("directory entry exists but is not a file: {cfg_file_path}");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("open pageserver config: {e}: {cfg_file_path}");
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Using pageserver toml: {config}");
|
||||
|
||||
// Construct the runtime representation
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, &config, workdir)
|
||||
.context("Failed to parse pageserver configuration")?;
|
||||
|
||||
Ok(Box::leak(Box::new(conf)))
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -141,24 +141,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
m.other
|
||||
);
|
||||
|
||||
let az_id = {
|
||||
let az_id_from_metadata = m
|
||||
.other
|
||||
.get("availability_zone_id")
|
||||
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
|
||||
|
||||
match az_id_from_metadata {
|
||||
Some(az_id) => Some(az_id),
|
||||
None => {
|
||||
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
|
||||
conf.availability_zone.clone()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if az_id.is_none() {
|
||||
panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
|
||||
}
|
||||
let az_id = m
|
||||
.other
|
||||
.get("availability_zone_id")
|
||||
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));
|
||||
|
||||
Some(NodeRegisterRequest {
|
||||
node_id: conf.id,
|
||||
@@ -166,7 +152,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
availability_zone_id: az_id.expect("Checked above"),
|
||||
availability_zone_id: az_id,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -41,15 +41,19 @@
|
||||
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
|
||||
// reading these fields. We use the Debug impl for semi-structured logging, though.
|
||||
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{config::DiskUsageEvictionTaskConfig, shard::TenantShardId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::serde_percent::Percent;
|
||||
use utils::{completion, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
@@ -65,9 +69,23 @@ use crate::{
|
||||
CancellableTask, DiskUsageEvictionTask,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub period: Duration,
|
||||
#[cfg(feature = "testing")]
|
||||
pub mock_statvfs: Option<crate::statvfs::mock::Behavior>,
|
||||
/// Select sorting for evicted layers
|
||||
#[serde(default)]
|
||||
pub eviction_order: EvictionOrder,
|
||||
}
|
||||
|
||||
/// Selects the sort order for eviction candidates *after* per tenant `min_resident_size`
|
||||
/// partitioning.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
pub enum EvictionOrder {
|
||||
/// Order the layers to be evicted by how recently they have been accessed relatively within
|
||||
/// the set of resident layers of a tenant.
|
||||
@@ -78,22 +96,23 @@ pub enum EvictionOrder {
|
||||
/// we read tenants is deterministic. If we find the need to use this as `false`, we need
|
||||
/// to ensure nondeterminism by adding in a random number to break the
|
||||
/// `relative_last_activity==0.0` ties.
|
||||
#[serde(default = "default_highest_layer_count_loses_first")]
|
||||
highest_layer_count_loses_first: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<pageserver_api::config::EvictionOrder> for EvictionOrder {
|
||||
fn from(value: pageserver_api::config::EvictionOrder) -> Self {
|
||||
match value {
|
||||
pageserver_api::config::EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first,
|
||||
} => Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first,
|
||||
},
|
||||
impl Default for EvictionOrder {
|
||||
fn default() -> Self {
|
||||
Self::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_highest_layer_count_loses_first() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl EvictionOrder {
|
||||
fn sort(&self, candidates: &mut [(EvictionPartition, EvictionCandidate)]) {
|
||||
use EvictionOrder::*;
|
||||
@@ -276,7 +295,7 @@ async fn disk_usage_eviction_task_iteration(
|
||||
storage,
|
||||
usage_pre,
|
||||
tenant_manager,
|
||||
task_config.eviction_order.into(),
|
||||
task_config.eviction_order,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
@@ -1238,6 +1257,7 @@ mod filesystem_level_usage {
|
||||
|
||||
#[test]
|
||||
fn max_usage_pct_pressure() {
|
||||
use super::EvictionOrder;
|
||||
use super::Usage as _;
|
||||
use std::time::Duration;
|
||||
use utils::serde_percent::Percent;
|
||||
@@ -1249,7 +1269,7 @@ mod filesystem_level_usage {
|
||||
period: Duration::MAX,
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
eviction_order: pageserver_api::config::EvictionOrder::default(),
|
||||
eviction_order: EvictionOrder::default(),
|
||||
},
|
||||
total_bytes: 100_000,
|
||||
avail_bytes: 0,
|
||||
|
||||
@@ -2076,7 +2076,7 @@ async fn disk_usage_eviction_run(
|
||||
evict_bytes: u64,
|
||||
|
||||
#[serde(default)]
|
||||
eviction_order: pageserver_api::config::EvictionOrder,
|
||||
eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, serde::Serialize)]
|
||||
@@ -2112,7 +2112,7 @@ async fn disk_usage_eviction_run(
|
||||
&state.remote_storage,
|
||||
usage,
|
||||
&state.tenant_manager,
|
||||
config.eviction_order.into(),
|
||||
config.eviction_order,
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum L0FlushConfig {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
@@ -14,16 +16,6 @@ impl Default for L0FlushConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<pageserver_api::models::L0FlushConfig> for L0FlushConfig {
|
||||
fn from(config: pageserver_api::models::L0FlushConfig) -> Self {
|
||||
match config {
|
||||
pageserver_api::models::L0FlushConfig::Direct { max_concurrency } => {
|
||||
Self::Direct { max_concurrency }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||
|
||||
|
||||
@@ -729,12 +729,8 @@ impl Timeline {
|
||||
let current_policy = self.last_aux_file_policy.load();
|
||||
match current_policy {
|
||||
Some(AuxFilePolicy::V1) => {
|
||||
let res = self.list_aux_files_v1(lsn, ctx).await?;
|
||||
let empty_str = if res.is_empty() { ", empty" } else { "" };
|
||||
warn!(
|
||||
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
|
||||
);
|
||||
Ok(res)
|
||||
warn!("this timeline is using deprecated aux file policy V1 (policy=V1)");
|
||||
self.list_aux_files_v1(lsn, ctx).await
|
||||
}
|
||||
None => {
|
||||
let res = self.list_aux_files_v1(lsn, ctx).await?;
|
||||
@@ -1661,7 +1657,7 @@ impl<'a> DatadirModification<'a> {
|
||||
if aux_files_key_v1.is_empty() {
|
||||
None
|
||||
} else {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
|
||||
warn!("this timeline is using deprecated aux file policy V1");
|
||||
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
|
||||
Some(AuxFilePolicy::V1)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,32 @@ pub mod mock {
|
||||
use regex::Regex;
|
||||
use tracing::log::info;
|
||||
|
||||
pub use pageserver_api::config::statvfs::mock::Behavior;
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Behavior {
|
||||
Success {
|
||||
blocksize: u64,
|
||||
total_blocks: u64,
|
||||
name_filter: Option<utils::serde_regex::Regex>,
|
||||
},
|
||||
Failure {
|
||||
mocked_error: MockedError,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum MockedError {
|
||||
EIO,
|
||||
}
|
||||
|
||||
impl From<MockedError> for nix::Error {
|
||||
fn from(e: MockedError) -> Self {
|
||||
match e {
|
||||
MockedError::EIO => nix::Error::EIO,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(tenants_dir: &Utf8Path, behavior: &Behavior) -> nix::Result<Statvfs> {
|
||||
info!("running mocked statvfs");
|
||||
@@ -91,7 +116,6 @@ pub mod mock {
|
||||
block_size: *blocksize,
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Behavior::Failure { mocked_error } => Err((*mocked_error).into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,10 +9,11 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
pub(crate) use pageserver_api::config::TenantConfigToml as TenantConf;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::CompactionAlgorithm;
|
||||
use pageserver_api::models::CompactionAlgorithmSettings;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::{self, ThrottleConfig};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
@@ -22,6 +23,50 @@ use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
use utils::generation::Generation;
|
||||
|
||||
pub mod defaults {
|
||||
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
|
||||
|
||||
// FIXME the below configs are only used by legacy algorithm. The new algorithm
|
||||
// has different parameters.
|
||||
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
|
||||
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
|
||||
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
||||
super::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
// If there's a need to decrease this value, first make sure that GC
|
||||
// doesn't hold a layer map write lock for non-trivial operations.
|
||||
// Relevant: https://github.com/neondatabase/neon/issues/3394
|
||||
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
|
||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
// The default limit on WAL lag should be set to avoid causing disconnects under high throughput
|
||||
// scenarios: since the broker stats are updated ~1/s, a value of 1GiB should be sufficient for
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) enum AttachmentMode {
|
||||
/// Our generation is current as far as we know, and as far as we know we are the only attached
|
||||
@@ -236,20 +281,96 @@ impl LocationConf {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LocationConf {
|
||||
// TODO: this should be removed once tenant loading can guarantee that we are never
|
||||
// loading from a directory without a configuration.
|
||||
// => tech debt since https://github.com/neondatabase/neon/issues/1555
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: Generation::none(),
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
tenant_conf: TenantConfOpt::default(),
|
||||
shard: ShardIdentity::unsharded(),
|
||||
}
|
||||
}
|
||||
/// A tenant's calcuated configuration, which is the result of merging a
|
||||
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
|
||||
///
|
||||
/// For storing and transmitting individual tenant's configuration, see
|
||||
/// TenantConfOpt.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
// This parameter actually determines L0 layer file size.
|
||||
pub checkpoint_distance: u64,
|
||||
// Inmemory layer is also flushed at least once in checkpoint_timeout to
|
||||
// eventually upload WAL after activity is stopped.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Duration,
|
||||
// Target file size, when creating image and delta layers.
|
||||
// This parameter determines L1 layer file size.
|
||||
pub compaction_target_size: u64,
|
||||
// How often to check if there's compaction work to be done.
|
||||
// Duration::ZERO means automatic compaction is disabled.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Duration,
|
||||
// Level0 delta layer threshold for compaction.
|
||||
pub compaction_threshold: usize,
|
||||
pub compaction_algorithm: CompactionAlgorithmSettings,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is #of bytes of WAL.
|
||||
// Page versions older than this are garbage collected away.
|
||||
pub gc_horizon: u64,
|
||||
// Interval at which garbage collection is triggered.
|
||||
// Duration::ZERO means automatic GC is disabled
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
// Page versions older than this are garbage collected away.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Duration,
|
||||
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Duration,
|
||||
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
|
||||
/// A stalled safekeeper will be changed to a newer one when it appears.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Duration,
|
||||
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
|
||||
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
|
||||
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
|
||||
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: AuxFilePolicy,
|
||||
|
||||
/// The length for an explicit LSN lease request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Duration,
|
||||
|
||||
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
|
||||
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Duration,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -424,6 +545,51 @@ impl TenantConfOpt {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TenantConf {
|
||||
fn default() -> Self {
|
||||
use defaults::*;
|
||||
Self {
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
|
||||
.expect("cannot parse default checkpoint timeout"),
|
||||
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
|
||||
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
|
||||
.expect("cannot parse default compaction period"),
|
||||
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
|
||||
compaction_algorithm: CompactionAlgorithmSettings {
|
||||
kind: DEFAULT_COMPACTION_ALGORITHM,
|
||||
},
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||
)
|
||||
.expect("cannot parse default walreceiver connect timeout"),
|
||||
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
|
||||
@@ -548,7 +548,7 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|_e| {
|
||||
.map_err(|e| {
|
||||
// Do a best-effort attempt at deleting the temporary file upon encountering an error.
|
||||
// We don't have async here nor do we want to pile on any extra errors.
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
@@ -556,6 +556,7 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
warn!("error deleting temporary file {temp_path}: {e}");
|
||||
}
|
||||
}
|
||||
e
|
||||
})?;
|
||||
|
||||
Ok((temp_path, file))
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -52,7 +52,6 @@ use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
@@ -34,7 +34,8 @@ use crate::tenant::disk_btree::{
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
@@ -45,7 +46,6 @@ use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
|
||||
@@ -215,7 +215,7 @@ impl IndexEntry {
|
||||
|
||||
const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
|
||||
let res = Self::validate_checkpoint_distance(
|
||||
pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
crate::tenant::config::defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
);
|
||||
if res.is_err() {
|
||||
panic!("default checkpoint distance is valid")
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
@@ -455,11 +456,9 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
|
||||
|
||||
// If compaction period is set to zero (to disable it), then we will use a reasonable default
|
||||
let period = if period == Duration::ZERO {
|
||||
humantime::Duration::from_str(
|
||||
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
|
||||
)
|
||||
.unwrap()
|
||||
.into()
|
||||
humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
|
||||
.unwrap()
|
||||
.into()
|
||||
} else {
|
||||
period
|
||||
};
|
||||
|
||||
@@ -66,6 +66,7 @@ use std::{
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
config::defaults::DEFAULT_PITR_INTERVAL,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
@@ -101,7 +102,6 @@ use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
@@ -2243,7 +2243,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
if aux_file_policy == Some(AuxFilePolicy::V1) {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
|
||||
warn!("this timeline is using deprecated aux file policy V1");
|
||||
}
|
||||
|
||||
result.repartition_threshold =
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::split_writer::{
|
||||
@@ -42,9 +43,6 @@ use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::DeltaLayer;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use pageserver_api::config::tenant_conf_defaults::{
|
||||
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
||||
};
|
||||
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::repository::{Key, Value};
|
||||
@@ -911,13 +909,137 @@ impl Timeline {
|
||||
// we're compacting, in key, LSN order.
|
||||
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
|
||||
// then the Value::Image is ordered before Value::WalRecord.
|
||||
let mut all_values_iter = {
|
||||
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact.iter() {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
//
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
|
||||
// option and validation code once we've reached confidence.
|
||||
enum AllValuesIter<'a> {
|
||||
PageCachedBlobIo {
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
StreamingKmergeBypassingPageCache {
|
||||
merge_iter: MergeIterator<'a>,
|
||||
},
|
||||
ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: CompactL0BypassPageCacheValidation,
|
||||
merge_iter: MergeIterator<'a>,
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
}
|
||||
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
|
||||
impl AllValuesIter<'_> {
|
||||
async fn next_all_keys_iter(
|
||||
iter: &mut VecIter<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
let Some(DeltaEntry {
|
||||
key,
|
||||
lsn,
|
||||
val: value_ref,
|
||||
..
|
||||
}) = iter.next()
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let value = value_ref.load(ctx).await?;
|
||||
Ok(Some((*key, *lsn, value)))
|
||||
}
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
match self {
|
||||
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
|
||||
Self::next_all_keys_iter(iter, ctx).await
|
||||
}
|
||||
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
|
||||
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
|
||||
// advance both iterators
|
||||
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
|
||||
let merge_iter_item = merge_iter.next().await;
|
||||
// compare results & log warnings as needed
|
||||
macro_rules! rate_limited_warn {
|
||||
($($arg:tt)*) => {{
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
warn!($($arg)*);
|
||||
panic!("CompactL0BypassPageCacheValidation failure, check logs");
|
||||
}
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!($($arg)*);
|
||||
});
|
||||
}}
|
||||
}
|
||||
match (&all_keys_iter_item, &merge_iter_item) {
|
||||
(Err(_), Err(_)) => {
|
||||
// don't bother asserting equivality of the errors
|
||||
}
|
||||
(Err(all_keys), Ok(merge)) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
|
||||
},
|
||||
(Ok(all_keys), Err(merge)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
|
||||
},
|
||||
(Ok(None), Ok(None)) => { }
|
||||
(Ok(Some(all_keys)), Ok(None)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
|
||||
}
|
||||
(Ok(None), Ok(Some(merge))) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
|
||||
}
|
||||
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
|
||||
match mode {
|
||||
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
|
||||
CompactL0BypassPageCacheValidation::KeyLsn => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn);
|
||||
let merge = (merge_key, merge_lsn);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsnValue => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
|
||||
let merge = (merge_key, merge_lsn, merge_value);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// in case of mismatch, trust the legacy all_keys_iter_item
|
||||
all_keys_iter_item
|
||||
}.instrument(info_span!("next")).await
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
|
||||
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
|
||||
let merge_iter = {
|
||||
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact.iter() {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
};
|
||||
match validate {
|
||||
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
|
||||
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: validate.clone(),
|
||||
merge_iter,
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
}
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
};
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
@@ -994,7 +1116,7 @@ impl Timeline {
|
||||
let mut keys = 0;
|
||||
|
||||
while let Some((key, lsn, value)) = all_values_iter
|
||||
.next()
|
||||
.next(ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?
|
||||
{
|
||||
@@ -1311,6 +1433,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum CompactL0Phase1ValueAccess {
|
||||
/// The old way.
|
||||
PageCachedBlobIo,
|
||||
/// The new way.
|
||||
StreamingKmerge {
|
||||
/// If set, we run both the old way and the new way, validate that
|
||||
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
|
||||
/// and if the validation fails,
|
||||
/// - in tests: fail them with a panic or
|
||||
/// - in prod, log a rate-limited warning and use the old way's results.
|
||||
///
|
||||
/// If not set, we only run the new way and trust its results.
|
||||
validate: Option<CompactL0BypassPageCacheValidation>,
|
||||
},
|
||||
}
|
||||
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
fn default() -> Self {
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
|
||||
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Entry point for new tiered compaction algorithm.
|
||||
///
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
@@ -28,6 +29,9 @@ use crate::context::RequestContext;
|
||||
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
|
||||
/// Metadata bundled with the start and end offset of a blob.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct BlobMeta {
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
//! This is similar to PostgreSQL's virtual file descriptor facility in
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
@@ -18,7 +19,6 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
|
||||
@@ -84,14 +84,9 @@ pub(crate) fn get() -> IoEngine {
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
IoEngineKind::TokioEpollUring
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
IoEngineKind::StdFs
|
||||
}
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
|
||||
63
poetry.lock
generated
63
poetry.lock
generated
@@ -985,38 +985,43 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "43.0.1"
|
||||
version = "42.0.4"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "cryptography-43.0.1-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:8385d98f6a3bf8bb2d65a73e17ed87a3ba84f6991c155691c51112075f9ffc5d"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:27e613d7077ac613e399270253259d9d53872aaf657471473ebfc9a52935c062"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68aaecc4178e90719e95298515979814bda0cbada1256a4485414860bd7ab962"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:de41fd81a41e53267cb020bb3a7212861da53a7d39f863585d13ea11049cf277"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f98bf604c82c416bc829e490c700ca1553eafdf2912a91e23a79d97d9801372a"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:61ec41068b7b74268fa86e3e9e12b9f0c21fcf65434571dbb13d954bceb08042"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:014f58110f53237ace6a408b5beb6c427b64e084eb451ef25a28308270086494"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-win32.whl", hash = "sha256:2bd51274dcd59f09dd952afb696bf9c61a7a49dfc764c04dd33ef7a6b502a1e2"},
|
||||
{file = "cryptography-43.0.1-cp37-abi3-win_amd64.whl", hash = "sha256:666ae11966643886c2987b3b721899d250855718d6d9ce41b521252a17985f4d"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:ac119bb76b9faa00f48128b7f5679e1d8d437365c5d26f1c2c3f0da4ce1b553d"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1bbcce1a551e262dfbafb6e6252f1ae36a248e615ca44ba302df077a846a8806"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:58d4e9129985185a06d849aa6df265bdd5a74ca6e1b736a77959b498e0505b85"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:d03a475165f3134f773d1388aeb19c2d25ba88b6a9733c5c590b9ff7bbfa2e0c"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:511f4273808ab590912a93ddb4e3914dfd8a388fed883361b02dea3791f292e1"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:80eda8b3e173f0f247f711eef62be51b599b5d425c429b5d4ca6a05e9e856baa"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:38926c50cff6f533f8a2dae3d7f19541432610d114a70808f0926d5aaa7121e4"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-win32.whl", hash = "sha256:a575913fb06e05e6b4b814d7f7468c2c660e8bb16d8d5a1faf9b33ccc569dd47"},
|
||||
{file = "cryptography-43.0.1-cp39-abi3-win_amd64.whl", hash = "sha256:d75601ad10b059ec832e78823b348bfa1a59f6b8d545db3a24fd44362a1564cb"},
|
||||
{file = "cryptography-43.0.1-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:ea25acb556320250756e53f9e20a4177515f012c9eaea17eb7587a8c4d8ae034"},
|
||||
{file = "cryptography-43.0.1-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:c1332724be35d23a854994ff0b66530119500b6053d0bd3363265f7e5e77288d"},
|
||||
{file = "cryptography-43.0.1-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:fba1007b3ef89946dbbb515aeeb41e30203b004f0b4b00e5e16078b518563289"},
|
||||
{file = "cryptography-43.0.1-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:5b43d1ea6b378b54a1dc99dd8a2b5be47658fe9a7ce0a58ff0b55f4b43ef2b84"},
|
||||
{file = "cryptography-43.0.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:88cce104c36870d70c49c7c8fd22885875d950d9ee6ab54df2745f83ba0dc365"},
|
||||
{file = "cryptography-43.0.1-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:9d3cdb25fa98afdd3d0892d132b8d7139e2c087da1712041f6b762e4f807cc96"},
|
||||
{file = "cryptography-43.0.1-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:e710bf40870f4db63c3d7d929aa9e09e4e7ee219e703f949ec4073b4294f6172"},
|
||||
{file = "cryptography-43.0.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:7c05650fe8023c5ed0d46793d4b7d7e6cd9c04e68eabe5b0aeea836e37bdcec2"},
|
||||
{file = "cryptography-43.0.1.tar.gz", hash = "sha256:203e92a75716d8cfb491dc47c79e17d0d9207ccffcbcb35f598fbe463ae3444d"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:ffc73996c4fca3d2b6c1c8c12bfd3ad00def8621da24f547626bf06441400449"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:db4b65b02f59035037fde0998974d84244a64c3265bdef32a827ab9b63d61b18"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dad9c385ba8ee025bb0d856714f71d7840020fe176ae0229de618f14dae7a6e2"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69b22ab6506a3fe483d67d1ed878e1602bdd5912a134e6202c1ec672233241c1"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:e09469a2cec88fb7b078e16d4adec594414397e8879a4341c6ace96013463d5b"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3e970a2119507d0b104f0a8e281521ad28fc26f2820687b3436b8c9a5fcf20d1"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e53dc41cda40b248ebc40b83b31516487f7db95ab8ceac1f042626bc43a2f992"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:c3a5cbc620e1e17009f30dd34cb0d85c987afd21c41a74352d1719be33380885"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bfadd884e7280df24d26f2186e4e07556a05d37393b0f220a840b083dc6a824"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01911714117642a3f1792c7f376db572aadadbafcd8d75bb527166009c9f1d1b"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-win32.whl", hash = "sha256:fb0cef872d8193e487fc6bdb08559c3aa41b659a7d9be48b2e10747f47863925"},
|
||||
{file = "cryptography-42.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c1f25b252d2c87088abc8bbc4f1ecbf7c919e05508a7e8628e6875c40bc70923"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:15a1fb843c48b4a604663fa30af60818cd28f895572386e5f9b8a665874c26e7"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1327f280c824ff7885bdeef8578f74690e9079267c1c8bd7dc5cc5aa065ae52"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ffb03d419edcab93b4b19c22ee80c007fb2d708429cecebf1dd3258956a563a"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1df6fcbf60560d2113b5ed90f072dc0b108d64750d4cbd46a21ec882c7aefce9"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:44a64043f743485925d3bcac548d05df0f9bb445c5fcca6681889c7c3ab12764"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:3c6048f217533d89f2f8f4f0fe3044bf0b2090453b7b73d0b77db47b80af8dff"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d0fbe73728c44ca3a241eff9aefe6496ab2656d6e7a4ea2459865f2e8613257"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:887623fe0d70f48ab3f5e4dbf234986b1329a64c066d719432d0698522749929"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:ce8613beaffc7c14f091497346ef117c1798c202b01153a8cc7b8e2ebaaf41c0"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-win32.whl", hash = "sha256:810bcf151caefc03e51a3d61e53335cd5c7316c0a105cc695f0959f2c638b129"},
|
||||
{file = "cryptography-42.0.4-cp39-abi3-win_amd64.whl", hash = "sha256:a0298bdc6e98ca21382afe914c642620370ce0470a01e1bef6dd9b5354c36854"},
|
||||
{file = "cryptography-42.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5f8907fcf57392cd917892ae83708761c6ff3c37a8e835d7246ff0ad251d9298"},
|
||||
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:12d341bd42cdb7d4937b0cabbdf2a94f949413ac4504904d0cdbdce4a22cbf88"},
|
||||
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1cdcdbd117681c88d717437ada72bdd5be9de117f96e3f4d50dab3f59fd9ab20"},
|
||||
{file = "cryptography-42.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0e89f7b84f421c56e7ff69f11c441ebda73b8a8e6488d322ef71746224c20fce"},
|
||||
{file = "cryptography-42.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:f1e85a178384bf19e36779d91ff35c7617c885da487d689b05c1366f9933ad74"},
|
||||
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d2a27aca5597c8a71abbe10209184e1a8e91c1fd470b5070a2ea60cafec35bcd"},
|
||||
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4e36685cb634af55e0677d435d425043967ac2f3790ec652b2b88ad03b85c27b"},
|
||||
{file = "cryptography-42.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f47be41843200f7faec0683ad751e5ef11b9a56a220d57f300376cd8aba81660"},
|
||||
{file = "cryptography-42.0.4.tar.gz", hash = "sha256:831a4b37accef30cccd34fcb916a5d7b5be3cbbe27268a02832c3e450aea39cb"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -1029,7 +1034,7 @@ nox = ["nox"]
|
||||
pep8test = ["check-sdist", "click", "mypy", "ruff"]
|
||||
sdist = ["build"]
|
||||
ssh = ["bcrypt (>=3.1.5)"]
|
||||
test = ["certifi", "cryptography-vectors (==43.0.1)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
|
||||
test = ["certifi", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
|
||||
test-randomorder = ["pytest-randomly"]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -38,7 +38,10 @@ impl Api {
|
||||
locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
|
||||
) -> Self {
|
||||
let jwt = std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN").unwrap_or_default();
|
||||
let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => String::new(),
|
||||
};
|
||||
Self {
|
||||
endpoint,
|
||||
caches,
|
||||
|
||||
@@ -35,17 +35,14 @@ pub fn new_client() -> ClientWithMiddleware {
|
||||
.build()
|
||||
}
|
||||
|
||||
pub(crate) fn new_client_with_timeout(
|
||||
request_timeout: Duration,
|
||||
total_retry_duration: Duration,
|
||||
) -> ClientWithMiddleware {
|
||||
pub(crate) fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
|
||||
let timeout_client = reqwest::ClientBuilder::new()
|
||||
.timeout(request_timeout)
|
||||
.timeout(default_timout)
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
|
||||
let retry_policy =
|
||||
ExponentialBackoff::builder().build_with_total_retry_duration(total_retry_duration);
|
||||
ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
|
||||
|
||||
reqwest_middleware::ClientBuilder::new(timeout_client)
|
||||
.with(reqwest_tracing::TracingMiddleware::default())
|
||||
|
||||
@@ -33,8 +33,7 @@ use uuid::{NoContext, Timestamp};
|
||||
|
||||
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
|
||||
|
||||
const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
|
||||
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Key that uniquely identifies the object, this metric describes.
|
||||
/// Currently, endpoint_id is enough, but this may change later,
|
||||
@@ -224,10 +223,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
|
||||
info!("metrics collector has shut down");
|
||||
}
|
||||
|
||||
let http_client = http::new_client_with_timeout(
|
||||
HTTP_REPORTING_REQUEST_TIMEOUT,
|
||||
HTTP_REPORTING_RETRY_DURATION,
|
||||
);
|
||||
let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
|
||||
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
|
||||
|
||||
let mut prev = Utc::now();
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.81.0"
|
||||
channel = "1.80.1"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -758,8 +758,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
// pq_sendint32(&reply_message, xmin);
|
||||
// pq_sendint32(&reply_message, xmin_epoch);
|
||||
// So it is two big endian 32-bit words in low endian order!
|
||||
hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
|
||||
hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
|
||||
hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32);
|
||||
hs_feedback.catalog_xmin =
|
||||
(hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32);
|
||||
self.ws_guard
|
||||
.walsenders
|
||||
.record_hs_feedback(self.ws_guard.id, &hs_feedback);
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE nodes ALTER availability_zone_id DROP NOT NULL;
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE nodes ALTER availability_zone_id SET NOT NULL;
|
||||
@@ -102,7 +102,7 @@ async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiEr
|
||||
|
||||
let validate_req = json_request::<ValidateRequest>(&mut req).await?;
|
||||
let state = get_state(&req);
|
||||
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
|
||||
json_response(StatusCode::OK, state.service.validate(validate_req))
|
||||
}
|
||||
|
||||
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
|
||||
|
||||
@@ -36,7 +36,7 @@ pub(crate) struct Node {
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: Option<String>,
|
||||
|
||||
// This cancellation token means "stop any RPCs in flight to this node, and don't start
|
||||
// any more". It is not related to process shutdown.
|
||||
@@ -63,9 +63,8 @@ impl Node {
|
||||
self.id
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_availability_zone_id(&self) -> &str {
|
||||
self.availability_zone_id.as_str()
|
||||
pub(crate) fn get_availability_zone_id(&self) -> Option<&str> {
|
||||
self.availability_zone_id.as_deref()
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
@@ -79,12 +78,22 @@ impl Node {
|
||||
/// Does this registration request match `self`? This is used when deciding whether a registration
|
||||
/// request should be allowed to update an existing record with the same node ID.
|
||||
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
self.id == register_req.node_id
|
||||
let az_ids_match = {
|
||||
match (
|
||||
self.availability_zone_id.as_deref(),
|
||||
register_req.availability_zone_id.as_deref(),
|
||||
) {
|
||||
(Some(current_az), Some(register_req_az)) => current_az == register_req_az,
|
||||
_ => true,
|
||||
}
|
||||
};
|
||||
|
||||
az_ids_match
|
||||
&& self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
&& self.listen_http_port == register_req.listen_http_port
|
||||
&& self.listen_pg_addr == register_req.listen_pg_addr
|
||||
&& self.listen_pg_port == register_req.listen_pg_port
|
||||
&& self.availability_zone_id == register_req.availability_zone_id
|
||||
}
|
||||
|
||||
/// For a shard located on this node, populate a response object
|
||||
@@ -181,7 +190,7 @@ impl Node {
|
||||
listen_http_port: u16,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
availability_zone_id: String,
|
||||
availability_zone_id: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
|
||||
@@ -8,7 +8,6 @@ use self::split_state::SplitState;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
@@ -92,8 +91,7 @@ pub(crate) enum DatabaseOperation {
|
||||
Detach,
|
||||
ReAttach,
|
||||
IncrementGeneration,
|
||||
TenantGenerations,
|
||||
ShardGenerations,
|
||||
PeekGenerations,
|
||||
ListTenantShards,
|
||||
InsertTenantShards,
|
||||
UpdateTenantShard,
|
||||
@@ -105,6 +103,7 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealthOutdated,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
SetNodeAzId,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -324,6 +323,31 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn set_node_availability_zone_id(
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_az_id: String,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::SetNodeAzId, move |conn| {
|
||||
let updated = diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.set((availability_zone_id.eq(input_az_id.clone()),))
|
||||
.execute(conn)?;
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
if updated != 1 {
|
||||
Err(DatabaseError::Logical(format!(
|
||||
"Node {node_id:?} not found for setting az id",
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
@@ -520,13 +544,13 @@ impl Persistence {
|
||||
/// If the tenant doesn't exist, an empty vector is returned.
|
||||
///
|
||||
/// Output is sorted by shard number
|
||||
pub(crate) async fn tenant_generations(
|
||||
pub(crate) async fn peek_generations(
|
||||
&self,
|
||||
filter_tenant_id: TenantId,
|
||||
) -> Result<Vec<ShardGenerationState>, DatabaseError> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let rows = self
|
||||
.with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
|
||||
.with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
|
||||
let result = tenant_shards
|
||||
.filter(tenant_id.eq(filter_tenant_id.to_string()))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
@@ -548,64 +572,6 @@ impl Persistence {
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Read the generation number of specific tenant shards
|
||||
///
|
||||
/// Output is unsorted. Output may not include values for all inputs, if they are missing in the database.
|
||||
pub(crate) async fn shard_generations(
|
||||
&self,
|
||||
mut tenant_shard_ids: impl Iterator<Item = &TenantShardId>,
|
||||
) -> Result<Vec<(TenantShardId, Option<Generation>)>, DatabaseError> {
|
||||
let mut rows = Vec::with_capacity(tenant_shard_ids.size_hint().0);
|
||||
|
||||
// We will chunk our input to avoid composing arbitrarily long `IN` clauses. Typically we are
|
||||
// called with a single digit number of IDs, but in principle we could be called with tens
|
||||
// of thousands (all the shards on one pageserver) from the generation validation API.
|
||||
loop {
|
||||
// A modest hardcoded chunk size to handle typical cases in a single query but never generate particularly
|
||||
// large query strings.
|
||||
let chunk_ids = tenant_shard_ids.by_ref().take(32);
|
||||
|
||||
// Compose a comma separated list of tuples for matching on (tenant_id, shard_number, shard_count)
|
||||
let in_clause = chunk_ids
|
||||
.map(|tsid| {
|
||||
format!(
|
||||
"('{}', {}, {})",
|
||||
tsid.tenant_id, tsid.shard_number.0, tsid.shard_count.0
|
||||
)
|
||||
})
|
||||
.join(",");
|
||||
|
||||
// We are done when our iterator gives us nothing to filter on
|
||||
if in_clause.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let chunk_rows = self
|
||||
.with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
|
||||
// diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
|
||||
// the inputs are strongly typed and cannot carry any user-supplied raw string content.
|
||||
let result : Vec<TenantShardPersistence> = diesel::sql_query(
|
||||
format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
|
||||
).load(conn)?;
|
||||
|
||||
Ok(result)
|
||||
})
|
||||
.await?;
|
||||
rows.extend(chunk_rows.into_iter())
|
||||
}
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|tsp| {
|
||||
(
|
||||
tsp.get_tenant_shard_id()
|
||||
.expect("Bad tenant ID in database"),
|
||||
tsp.generation.map(|g| Generation::new(g as u32)),
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[allow(non_local_definitions)]
|
||||
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
|
||||
///
|
||||
@@ -1017,9 +983,7 @@ impl Persistence {
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
#[derive(
|
||||
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
|
||||
)]
|
||||
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::tenant_shards)]
|
||||
pub(crate) struct TenantShardPersistence {
|
||||
#[serde(default)]
|
||||
@@ -1084,7 +1048,7 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_http_port: i32,
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) availability_zone_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -17,7 +17,6 @@ use utils::failpoint_support;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
@@ -594,8 +593,6 @@ impl Reconciler {
|
||||
notify_attempts += 1;
|
||||
}
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-notify");
|
||||
|
||||
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
|
||||
// this location will be deleted in the general case reconciliation that runs after this.
|
||||
let origin_secondary_conf = build_location_config(
|
||||
|
||||
@@ -528,7 +528,7 @@ pub(crate) mod test_utils {
|
||||
80 + i as u16,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
"test-az".to_string(),
|
||||
None,
|
||||
);
|
||||
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
|
||||
assert!(node.is_available());
|
||||
|
||||
@@ -25,7 +25,7 @@ diesel::table! {
|
||||
listen_http_port -> Int4,
|
||||
listen_pg_addr -> Varchar,
|
||||
listen_pg_port -> Int4,
|
||||
availability_zone_id -> Varchar,
|
||||
availability_zone_id -> Nullable<Varchar>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1264,7 +1264,7 @@ impl Service {
|
||||
123,
|
||||
"".to_string(),
|
||||
123,
|
||||
"test_az".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
scheduler.node_upsert(&node);
|
||||
@@ -1854,74 +1854,37 @@ impl Service {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn validate(
|
||||
&self,
|
||||
validate_req: ValidateRequest,
|
||||
) -> Result<ValidateResponse, DatabaseError> {
|
||||
// Fast in-memory check: we may reject validation on anything that doesn't match our
|
||||
// in-memory generation for a shard
|
||||
let in_memory_result = {
|
||||
let mut in_memory_result = Vec::new();
|
||||
let locked = self.inner.read().unwrap();
|
||||
for req_tenant in validate_req.tenants {
|
||||
if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
|
||||
let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
|
||||
tracing::info!(
|
||||
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
|
||||
req_tenant.id,
|
||||
req_tenant.gen,
|
||||
tenant_shard.generation
|
||||
);
|
||||
|
||||
in_memory_result.push((req_tenant.id, Generation::new(req_tenant.gen), valid));
|
||||
} else {
|
||||
// This is legal: for example during a shard split the pageserver may still
|
||||
// have deletions in its queue from the old pre-split shard, or after deletion
|
||||
// of a tenant that was busy with compaction/gc while being deleted.
|
||||
tracing::info!(
|
||||
"Refusing deletion validation for missing shard {}",
|
||||
req_tenant.id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
in_memory_result
|
||||
};
|
||||
|
||||
// Database calls to confirm validity for anything that passed the in-memory check. We must do this
|
||||
// in case of controller split-brain, where some other controller process might have incremented the generation.
|
||||
let db_generations = self
|
||||
.persistence
|
||||
.shard_generations(in_memory_result.iter().filter_map(|i| {
|
||||
if i.2 {
|
||||
Some(&i.0)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))
|
||||
.await?;
|
||||
let db_generations = db_generations.into_iter().collect::<HashMap<_, _>>();
|
||||
pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let mut response = ValidateResponse {
|
||||
tenants: Vec::new(),
|
||||
};
|
||||
for (tenant_shard_id, validate_generation, valid) in in_memory_result.into_iter() {
|
||||
let valid = if valid {
|
||||
let db_generation = db_generations.get(&tenant_shard_id);
|
||||
db_generation == Some(&Some(validate_generation))
|
||||
|
||||
for req_tenant in validate_req.tenants {
|
||||
if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
|
||||
let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
|
||||
tracing::info!(
|
||||
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
|
||||
req_tenant.id,
|
||||
req_tenant.gen,
|
||||
tenant_shard.generation
|
||||
);
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: req_tenant.id,
|
||||
valid,
|
||||
});
|
||||
} else {
|
||||
// If in-memory state says it's invalid, trust that. It's always safe to fail a validation, at worst
|
||||
// this prevents a pageserver from cleaning up an object in S3.
|
||||
false
|
||||
};
|
||||
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: tenant_shard_id,
|
||||
valid,
|
||||
})
|
||||
// After tenant deletion, we may approve any validation. This avoids
|
||||
// spurious warnings on the pageserver if it has pending LSN updates
|
||||
// at the point a deletion happens.
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: req_tenant.id,
|
||||
valid: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
response
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_create(
|
||||
@@ -3216,7 +3179,7 @@ impl Service {
|
||||
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
|
||||
// will still be the latest when we're done: we will check generations again at the end of
|
||||
// this function to handle that.
|
||||
let generations = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
|
||||
if generations
|
||||
.iter()
|
||||
@@ -3273,7 +3236,7 @@ impl Service {
|
||||
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
|
||||
// our remote operation executed on the latest generation and is therefore persistent.
|
||||
{
|
||||
let latest_generations = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let latest_generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
if latest_generations
|
||||
.into_iter()
|
||||
.map(
|
||||
@@ -4825,8 +4788,15 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
if register_req.availability_zone_id.is_none() {
|
||||
tracing::warn!(
|
||||
"Node {} registering without specific availability zone id",
|
||||
register_req.node_id
|
||||
);
|
||||
}
|
||||
|
||||
enum RegistrationStatus {
|
||||
Matched,
|
||||
Matched(Node),
|
||||
Mismatched,
|
||||
New,
|
||||
}
|
||||
@@ -4835,7 +4805,7 @@ impl Service {
|
||||
let locked = self.inner.read().unwrap();
|
||||
if let Some(node) = locked.nodes.get(®ister_req.node_id) {
|
||||
if node.registration_match(®ister_req) {
|
||||
RegistrationStatus::Matched
|
||||
RegistrationStatus::Matched(node.clone())
|
||||
} else {
|
||||
RegistrationStatus::Mismatched
|
||||
}
|
||||
@@ -4845,12 +4815,41 @@ impl Service {
|
||||
};
|
||||
|
||||
match registration_status {
|
||||
RegistrationStatus::Matched => {
|
||||
RegistrationStatus::Matched(node) => {
|
||||
tracing::info!(
|
||||
"Node {} re-registered with matching address",
|
||||
register_req.node_id
|
||||
);
|
||||
|
||||
if node.get_availability_zone_id().is_none() {
|
||||
if let Some(az_id) = register_req.availability_zone_id.clone() {
|
||||
tracing::info!("Extracting availability zone id from registration request for node {}: {}",
|
||||
register_req.node_id, az_id);
|
||||
|
||||
// Persist to the database and update in memory state. See comment below
|
||||
// on ordering.
|
||||
self.persistence
|
||||
.set_node_availability_zone_id(register_req.node_id, az_id)
|
||||
.await?;
|
||||
let node_with_az = Node::new(
|
||||
register_req.node_id,
|
||||
register_req.listen_http_addr,
|
||||
register_req.listen_http_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.availability_zone_id,
|
||||
);
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
|
||||
locked.scheduler.node_upsert(&node_with_az);
|
||||
new_nodes.insert(register_req.node_id, node_with_az);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
RegistrationStatus::Mismatched => {
|
||||
@@ -6261,13 +6260,9 @@ impl Service {
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new()
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
|
||||
// TODO(vlad): Currently this operates on the assumption that all
|
||||
// secondaries are warm. This is not always true (e.g. we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
let mut tids_to_promote = self.fill_node_plan(node_id);
|
||||
let mut waiters = Vec::new();
|
||||
|
||||
@@ -6335,11 +6330,9 @@ impl Service {
|
||||
node_id
|
||||
);
|
||||
|
||||
if let Some(waiter) = self.maybe_configured_reconcile_shard(
|
||||
tenant_shard,
|
||||
nodes,
|
||||
reconciler_config,
|
||||
) {
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(tenant_shard, nodes)
|
||||
{
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import os
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import _GeneratorContextManager, contextmanager
|
||||
|
||||
@@ -10,7 +8,6 @@ import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
PgBin,
|
||||
@@ -336,26 +333,3 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
|
||||
fixture = request.getfixturevalue(request.param)
|
||||
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
|
||||
return fixture
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def sync_after_each_test():
|
||||
# The fixture calls `sync(2)` after each test if `SYNC_AFTER_EACH_TEST` env var is `true`
|
||||
#
|
||||
# In CI, `SYNC_AFTER_EACH_TEST` is set to `true` only for benchmarks (`test_runner/performance`)
|
||||
# that are run on self-hosted runners because some of these tests are pretty write-heavy
|
||||
# and create issues to start the processes within 10s
|
||||
key = "SYNC_AFTER_EACH_TEST"
|
||||
enabled = os.environ.get(key) == "true"
|
||||
|
||||
yield
|
||||
|
||||
if not enabled:
|
||||
# regress test, or running locally
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
# we only run benches on unices, the method might not exist on windows
|
||||
os.sync()
|
||||
elapsed = time.time() - start
|
||||
log.info(f"called sync after test {elapsed=}")
|
||||
|
||||
@@ -24,20 +24,7 @@ from functools import cached_property, partial
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union, cast
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
import asyncpg
|
||||
@@ -103,8 +90,6 @@ from fixtures.utils import AuxFileStore as AuxFileStore # reexport
|
||||
|
||||
from .neon_api import NeonAPI, NeonApiEndpoint
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
"""
|
||||
This file contains pytest fixtures. A fixture is a test resource that can be
|
||||
summoned by placing its name in the test's arguments.
|
||||
@@ -758,9 +743,6 @@ class NeonEnvBuilder:
|
||||
patch_script = ""
|
||||
for ps in self.env.pageservers:
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
|
||||
# This is a temporary to get the backward compat test happy
|
||||
# since the compat snapshot was generated with an older version of neon local
|
||||
patch_script += f"UPDATE nodes SET availability_zone_id='{ps.az_id}' WHERE node_id = '{ps.id}' AND availability_zone_id IS NULL;"
|
||||
patch_script_path.write_text(patch_script)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
@@ -3004,17 +2986,16 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
def config_toml_path(self) -> Path:
|
||||
return self.workdir / "pageserver.toml"
|
||||
|
||||
def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], T]) -> T:
|
||||
def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], None]):
|
||||
"""
|
||||
Edit the pageserver's config toml file in place.
|
||||
"""
|
||||
path = self.config_toml_path
|
||||
with open(path, "r") as f:
|
||||
config = toml.load(f)
|
||||
res = edit_fn(config)
|
||||
edit_fn(config)
|
||||
with open(path, "w") as f:
|
||||
toml.dump(config, f)
|
||||
return res
|
||||
|
||||
def patch_config_toml_nonrecursive(self, patch: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
@@ -142,10 +142,11 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
def remove_control_plane_api_field(config):
|
||||
return config.pop("control_plane_api")
|
||||
|
||||
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
|
||||
replaced_config = env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": "",
|
||||
}
|
||||
)
|
||||
env.pageserver.start()
|
||||
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
|
||||
|
||||
@@ -178,11 +179,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env.pageserver.stop()
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": control_plane_api,
|
||||
}
|
||||
)
|
||||
env.pageserver.patch_config_toml_nonrecursive(replaced_config)
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
@@ -2332,122 +2332,6 @@ def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder)
|
||||
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)
|
||||
|
||||
|
||||
def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
A correctness edge case: while we are live migrating and a shard's generation is
|
||||
visible to the Reconciler but not to the central Service, the generation validation
|
||||
API should still prevent stale generations from doing deletions.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
env.neon_cli.create_tenant(tenant_id, timeline_id)
|
||||
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, TENANT_CONF)
|
||||
|
||||
# Write enough data that a compaction would do some work (deleting some L0s)
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
for _i in range(0, 2):
|
||||
workload.churn_rows(64, upload=False)
|
||||
|
||||
# Upload but don't compact
|
||||
origin_pageserver = env.get_tenant_pageserver(tenant_id)
|
||||
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
|
||||
origin_pageserver.http_client().timeline_checkpoint(
|
||||
tenant_id, timeline_id, wait_until_uploaded=True, compact=False
|
||||
)
|
||||
|
||||
# Start a compaction that will pause on a failpoint.
|
||||
compaction_failpoint = "before-upload-index-pausable"
|
||||
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "pause"))
|
||||
|
||||
# This failpoint can also cause migration code to time out trying to politely flush
|
||||
# during migrations
|
||||
origin_pageserver.allowed_errors.append(".*Timed out waiting for flush to remote storage.*")
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
compact_fut = executor.submit(
|
||||
origin_pageserver.http_client().timeline_compact,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
wait_until_uploaded=True,
|
||||
)
|
||||
|
||||
# Let the compaction start and then get stuck uploading an index: when we live migrate, the new generation's
|
||||
# index will be initialized from the pre-compaction index, referencing layers that the compaction will try to delete
|
||||
def has_hit_compaction_failpoint():
|
||||
assert origin_pageserver.log_contains(f"at failpoint {compaction_failpoint}")
|
||||
|
||||
wait_until(10, 1, has_hit_compaction_failpoint)
|
||||
|
||||
# While the compaction is running, start a live migration which will pause long enough for the compaction to sleep,
|
||||
# after incrementing generation and attaching the new location
|
||||
migration_failpoint = "reconciler-live-migrate-post-notify"
|
||||
env.storage_controller.configure_failpoints((migration_failpoint, "pause"))
|
||||
migrate_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate,
|
||||
TenantShardId(tenant_id, 0, 0),
|
||||
dest_ps_id,
|
||||
)
|
||||
|
||||
def has_hit_migration_failpoint():
|
||||
assert env.storage_controller.log_contains(f"at failpoint {migration_failpoint}")
|
||||
|
||||
# Long wait because the migration will have to time out during transition to AttachedStale
|
||||
# before it reaches this point. The timeout is because the AttachedStale transition includes
|
||||
# a flush of remote storage, and if the compaction already enqueued an index upload this cannot
|
||||
# make progress.
|
||||
wait_until(60, 1, has_hit_migration_failpoint)
|
||||
|
||||
# Origin pageserver has succeeded with compaction before the migration completed. It has done all the writes it wanted to do in its own (stale) generation
|
||||
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
|
||||
compact_fut.result()
|
||||
origin_pageserver.http_client().deletion_queue_flush(execute=True)
|
||||
|
||||
# Eventually migration completes
|
||||
env.storage_controller.configure_failpoints((migration_failpoint, "off"))
|
||||
migrate_fut.result()
|
||||
except:
|
||||
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
||||
env.storage_controller.configure_failpoints((migration_failpoint, "off"))
|
||||
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
|
||||
raise
|
||||
|
||||
# Ensure the destination of the migration writes an index, so that if it has corrupt state that is
|
||||
# visible to the scrubber.
|
||||
workload.write_rows(1, upload=False)
|
||||
env.get_pageserver(dest_ps_id).http_client().timeline_checkpoint(
|
||||
tenant_id, timeline_id, wait_until_uploaded=True, compact=False
|
||||
)
|
||||
|
||||
# The destination of the live migration would now have a corrupt index (referencing deleted L0s) if
|
||||
# the controller had not properly applied validation rules.
|
||||
healthy, _summary = env.storage_scrubber.scan_metadata()
|
||||
try:
|
||||
log.info(f"scrubbed, healthy={healthy}")
|
||||
assert healthy
|
||||
except:
|
||||
# On failures, we want to report them FAIL during the test, not as ERROR during teardown
|
||||
neon_env_builder.enable_scrub_on_exit = False
|
||||
raise
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
@@ -217,13 +217,6 @@ def test_scrubber_physical_gc_ancestors(
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Issue a deletion queue flush so that the parent shard can't leave behind layers
|
||||
# that will look like unexpected garbage to the scrubber
|
||||
for pre_split_shard in env.storage_controller.locate(tenant_id):
|
||||
env.get_pageserver(pre_split_shard["node_id"]).http_client().deletion_queue_flush(
|
||||
execute=True
|
||||
)
|
||||
|
||||
new_shard_count = 4
|
||||
assert shard_count is None or new_shard_count > shard_count
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
|
||||
@@ -328,10 +321,6 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
|
||||
workload.write_rows(100, upload=False)
|
||||
workload.stop()
|
||||
|
||||
# Issue a deletion queue flush so that the parent shard can't leave behind layers
|
||||
# that will look like unexpected garbage to the scrubber
|
||||
env.get_tenant_pageserver(tenant_id).http_client().deletion_queue_flush(execute=True)
|
||||
|
||||
new_shard_count = 4
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
|
||||
for shard in shards:
|
||||
|
||||
@@ -733,7 +733,7 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# We will run with the limit set to 1, so that once we have one tenant stuck
|
||||
# in a pausable failpoint, the rest are prevented from proceeding through warmup.
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
@@ -984,7 +984,7 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -1062,7 +1062,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("activation_method", ["endpoint", "branch", "delete"])
|
||||
def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_method: str):
|
||||
# env.initial_tenant will take up this permit when attaching with lazy because of a failpoint activated after restart
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = 1"
|
||||
neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a317b9b5b9...48388a5b59
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 49d5e576a5...8aa1ded772
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 6e9a4ff624...95132feffe
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,14 +1,14 @@
|
||||
{
|
||||
"v16": [
|
||||
"16.4",
|
||||
"6e9a4ff6249ac02b8175054b7b3f7dfb198be48b"
|
||||
"95132feffe277ce84309d93a42e9aadfd2cb0437"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"49d5e576a56e4cc59cd6a6a0791b2324b9fa675e"
|
||||
"8aa1ded7726d416ac8e02600aad387a353478fc7"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"a317b9b5b96978b49e78986697f3dd80d06f99a7"
|
||||
"48388a5b597c81c09e28c016650a7156b48717a1"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -47,8 +47,7 @@ hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12", default-features = false, features = ["use_std"] }
|
||||
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
|
||||
itertools = { version = "0.10" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -102,8 +101,7 @@ either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12", default-features = false, features = ["use_std"] }
|
||||
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
|
||||
itertools = { version = "0.10" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
|
||||
Reference in New Issue
Block a user