mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
10 Commits
thesuhas/p
...
hackathon-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bea1580af2 | ||
|
|
136eaeb74a | ||
|
|
211b824d62 | ||
|
|
f9fdbc9618 | ||
|
|
95a5f749c8 | ||
|
|
5db20af8a7 | ||
|
|
136cf1979b | ||
|
|
08bb72e516 | ||
|
|
6f4f3691a5 | ||
|
|
a2b756843e |
40
Cargo.lock
generated
40
Cargo.lock
generated
@@ -3898,6 +3898,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.1"
|
||||
@@ -4182,6 +4192,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.11.1"
|
||||
@@ -5239,6 +5255,7 @@ dependencies = [
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
"tracing-utils",
|
||||
"try-lock",
|
||||
"typed-json",
|
||||
@@ -7689,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -7702,6 +7720,27 @@ dependencies = [
|
||||
"tracing-serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-test"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
|
||||
dependencies = [
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
"tracing-test-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-test-macro"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-utils"
|
||||
version = "0.1.0"
|
||||
@@ -8554,6 +8593,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
"zeroize",
|
||||
|
||||
@@ -1097,7 +1097,7 @@ USER root
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN cargo install --locked --git https://github.com/thesuhas/pgrx.git --branch expose_guc_assign_hook cargo-pgrx && \
|
||||
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
@@ -1355,9 +1355,7 @@ RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgre
|
||||
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { git = "https:\/\/github.com\/thesuhas\/pgrx.git", branch = "expose_guc_assign_hook", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx-tests = "0.14.1"/pgrx-tests = { git = "https:\/\/github.com\/thesuhas\/pgrx.git", branch = "expose_guc_assign_hook" }/g' Cargo.toml && \
|
||||
sed -i '/\[dependencies\]/a libc = "0.2.172"' Cargo.toml && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
patch -p1 < /ext-src/anon_v2.patch
|
||||
|
||||
FROM rust-extensions-build-pgrx14 AS pg-anon-pg-build
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
diff --git a/sql/anon.sql b/sql/anon.sql
|
||||
index 0cdc769..85a58a6 100644
|
||||
index 0cdc769..f6cc950 100644
|
||||
--- a/sql/anon.sql
|
||||
+++ b/sql/anon.sql
|
||||
@@ -1141,3 +1141,9 @@ $$
|
||||
@@ -1141,3 +1141,8 @@ $$
|
||||
-- TODO : https://en.wikipedia.org/wiki/L-diversity
|
||||
|
||||
-- TODO : https://en.wikipedia.org/wiki/T-closeness
|
||||
@@ -11,9 +11,8 @@ index 0cdc769..85a58a6 100644
|
||||
+
|
||||
+GRANT ALL ON SCHEMA anon to neon_superuser;
|
||||
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
|
||||
+-- GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
|
||||
diff --git a/sql/init.sql b/sql/init.sql
|
||||
index 7da6553..7961984 100644
|
||||
index 7da6553..9b6164b 100644
|
||||
--- a/sql/init.sql
|
||||
+++ b/sql/init.sql
|
||||
@@ -74,50 +74,49 @@ $$
|
||||
@@ -128,267 +127,3 @@ index 7da6553..7961984 100644
|
||||
VOLATILE
|
||||
PARALLEL UNSAFE -- because init is unsafe
|
||||
SECURITY INVOKER
|
||||
@@ -264,3 +263,22 @@ $$
|
||||
;
|
||||
|
||||
SECURITY LABEL FOR anon ON FUNCTION anon.unload IS 'UNTRUSTED';
|
||||
+
|
||||
+
|
||||
+CREATE OR REPLACE FUNCTION anon.toggle_transparent_dynamic_masking(
|
||||
+ dbname TEXT,
|
||||
+ toggle BOOLEAN DEFAULT TRUE
|
||||
+)
|
||||
+RETURNS VOID AS
|
||||
+$$
|
||||
+BEGIN
|
||||
+ EXECUTE format('ALTER DATABASE %I SET anon.transparent_dynamic_masking TO %s', dbname, toggle::TEXT);
|
||||
+END;
|
||||
+$$
|
||||
+ LANGUAGE plpgsql
|
||||
+ VOLATILE
|
||||
+ SECURITY DEFINER
|
||||
+ SET search_path=''
|
||||
+;
|
||||
+
|
||||
+SECURITY LABEL FOR anon ON FUNCTION anon.toggle_transparent_dynamic_masking IS 'UNTRUSTED';
|
||||
diff --git a/src/guc.rs b/src/guc.rs
|
||||
index 74d3822..d4121ae 100644
|
||||
--- a/src/guc.rs
|
||||
+++ b/src/guc.rs
|
||||
@@ -3,7 +3,7 @@
|
||||
//----------------------------------------------------------------------------
|
||||
|
||||
use pgrx::*;
|
||||
-use std::ffi::CStr;
|
||||
+use std::ffi::{CStr, c_void};
|
||||
|
||||
pub static ANON_DUMMY_LOCALE: GucSetting<Option<&'static CStr>> =
|
||||
GucSetting::<Option<&'static CStr>>::new(Some(unsafe {
|
||||
@@ -51,25 +51,97 @@ static ANON_MASK_SCHEMA: GucSetting<Option<&'static CStr>> =
|
||||
CStr::from_bytes_with_nul_unchecked(b"mask\0")
|
||||
}));
|
||||
|
||||
+
|
||||
+unsafe extern "C-unwind" fn check_bool_guc_hook(
|
||||
+ _newval: *mut bool,
|
||||
+ _extra: *mut *mut c_void,
|
||||
+ source: u32
|
||||
+) -> bool {
|
||||
+ unsafe {
|
||||
+ // The sources that we allow are:
|
||||
+ // 1. PGC_S_DEFAULT (0) -> for default boot up source, likely new session or server.
|
||||
+ // 2. PGC_S_DATABASE (6) -> a GUC set for a particular database
|
||||
+ // 3. PGC_S_USER (7) -> a GUC set for a particular role
|
||||
+ // 4. PGC_S_DATABASE_USER (8) -> a GUC set for a particular role in a particular database
|
||||
+ // This check only allows sources that load a variable, not ones that try to alter it.
|
||||
+ // Sources that try to alter it are:
|
||||
+ // 1. PGC_S_FILE (3) -> ALTER SYSTEM
|
||||
+ // 2. PGC_S_TEST (12) -> ALTER ROLE/DATABASE
|
||||
+ // 3. PGC_S_SESSION (13) -> SET ...
|
||||
+ // TODO (thesuhas): Does PGC_S_GLOBAL need to be added to whitelisted sources?
|
||||
+ pg_sys::info!("Source: {}", source);
|
||||
+ if source == 0 || source == 6 || source == 7 || source == 8 {
|
||||
+ return true;
|
||||
+ }
|
||||
+ let oid = pg_sys::GetUserId();
|
||||
+ let user_name = CStr::from_ptr(pg_sys::GetUserNameFromId(oid, true));
|
||||
+ let user_str = user_name.to_str().unwrap();
|
||||
+ pg_sys::info!("user: {} trying to change boolean guc", user_str);
|
||||
+ if pg_sys::superuser() || user_str == "neon_superuser" || user_str == "neondb_owner" {
|
||||
+ return true;
|
||||
+ }
|
||||
+ pg_sys::ereport!(PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_INSUFFICIENT_PRIVILEGE, "You are not authorized to change this GUC");
|
||||
+ false
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
+unsafe extern "C-unwind" fn check_string_guc_hook(
|
||||
+_newval: *mut *mut libc::c_char,
|
||||
+_extra: *mut *mut c_void,
|
||||
+source: u32
|
||||
+) -> bool {
|
||||
+ unsafe {
|
||||
+ // The sources that we allow are:
|
||||
+ // 1. PGC_S_DEFAULT (0) -> for default boot up source, likely new session or server.
|
||||
+ // 2. PGC_S_DATABASE (6) -> a GUC set for a particular database
|
||||
+ // 3. PGC_S_USER (7) -> a GUC set for a particular role
|
||||
+ // 4. PGC_S_DATABASE_USER (8) -> a GUC set for a particular role in a particular database
|
||||
+ // This check only allows sources that load a variable, not ones that try to alter it.
|
||||
+ // Sources that try to alter it are:
|
||||
+ // 1. PGC_S_FILE (3) -> ALTER SYSTEM
|
||||
+ // 2. PGC_S_TEST (12) -> ALTER ROLE/DATABASE
|
||||
+ // 3. PGC_S_SESSION (13) -> SET ...
|
||||
+ pg_sys::info!("Source: {}", source);
|
||||
+ if source == 0 || source == 6 || source == 7 || source == 8 {
|
||||
+ return true;
|
||||
+ }
|
||||
+ let oid = pg_sys::GetUserId();
|
||||
+ let user_name = CStr::from_ptr(pg_sys::GetUserNameFromId(oid, true));
|
||||
+ let user_str = user_name.to_str().unwrap();
|
||||
+ pg_sys::info!("user: {} trying to change string guc", user_str);
|
||||
+ if pg_sys::superuser() || user_str == "neon_superuser" || user_str == "neondb_owner" {
|
||||
+ return true;
|
||||
+ }
|
||||
+ pg_sys::ereport!(PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_INSUFFICIENT_PRIVILEGE, "You are not authorized to change this GUC");
|
||||
+ false
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
// Register the GUC parameters for the extension
|
||||
//
|
||||
pub fn register_gucs() {
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.dummy_locale",
|
||||
"The default locale for the dummy data functions",
|
||||
"",
|
||||
&ANON_DUMMY_LOCALE,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY,
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.k_anonymity_provider",
|
||||
"The security label provider used for k-anonymity",
|
||||
"",
|
||||
&ANON_K_ANONYMITY_PROVIDER,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY,
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
//
|
||||
@@ -80,86 +152,113 @@ pub fn register_gucs() {
|
||||
//
|
||||
// https://github.com/pgcentralfoundation/pgrx/commit/d096efe6fb2d86e87d117b520b9ccd2f90b2e0d1
|
||||
//
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.masking_policies",
|
||||
"Define additional masking policies (the 'anon' policy is already defined)",
|
||||
"",
|
||||
&ANON_MASKING_POLICIES,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY, /* | GucFlags::LIST_INPUT */
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_bool_guc(
|
||||
+ GucRegistry::define_bool_guc_with_hooks(
|
||||
"anon.privacy_by_default",
|
||||
"Mask all columns with NULL (or the default value for NOT NULL columns)",
|
||||
"",
|
||||
&ANON_PRIVACY_BY_DEFAULT,
|
||||
- GucContext::Suset,
|
||||
+ GucContext::Userset,
|
||||
GucFlags::default(),
|
||||
+ Some(check_bool_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
- GucRegistry::define_bool_guc(
|
||||
+ GucRegistry::define_bool_guc_with_hooks(
|
||||
"anon.transparent_dynamic_masking",
|
||||
"New masking engine (EXPERIMENTAL)",
|
||||
"",
|
||||
&ANON_TRANSPARENT_DYNAMIC_MASKING,
|
||||
- GucContext::Suset,
|
||||
+ GucContext::Userset,
|
||||
GucFlags::default(),
|
||||
+ Some(check_bool_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_bool_guc(
|
||||
+ GucRegistry::define_bool_guc_with_hooks(
|
||||
"anon.restrict_to_trusted_schemas",
|
||||
"Masking filters must be in a trusted schema",
|
||||
"Activate this option to prevent non-superuser from using their own masking filters",
|
||||
&ANON_RESTRICT_TO_TRUSTED_SCHEMAS,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY,
|
||||
+ Some(check_bool_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_bool_guc(
|
||||
+ GucRegistry::define_bool_guc_with_hooks(
|
||||
"anon.strict_mode",
|
||||
"A masking rule cannot change a column data type, unless you disable this",
|
||||
"Disabling the mode is not recommended",
|
||||
&ANON_STRICT_MODE,
|
||||
- GucContext::Suset,
|
||||
+ GucContext::Userset,
|
||||
GucFlags::default(),
|
||||
+ Some(check_bool_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
// The GUC vars below are not used in the Rust code
|
||||
// but they are used in the plpgsql code
|
||||
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.algorithm",
|
||||
"The hash method used for pseudonymizing functions",
|
||||
"",
|
||||
&ANON_ALGORITHM,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY,
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.maskschema",
|
||||
"The schema where the dynamic masking views are stored",
|
||||
"",
|
||||
&ANON_MASK_SCHEMA,
|
||||
- GucContext::Suset,
|
||||
+ GucContext::Userset,
|
||||
GucFlags::default(),
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.salt",
|
||||
"The salt value used for the pseudonymizing functions",
|
||||
"",
|
||||
&ANON_SALT,
|
||||
GucContext::Suset,
|
||||
GucFlags::SUPERUSER_ONLY,
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
|
||||
- GucRegistry::define_string_guc(
|
||||
+ GucRegistry::define_string_guc_with_hooks(
|
||||
"anon.sourceschema",
|
||||
"The schema where the table are masked by the dynamic masking engine",
|
||||
"",
|
||||
&ANON_SOURCE_SCHEMA,
|
||||
- GucContext::Suset,
|
||||
+ GucContext::Userset,
|
||||
GucFlags::default(),
|
||||
+ Some(check_string_guc_hook),
|
||||
+ None,
|
||||
+ None,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -551,6 +551,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
|
||||
basebackup_cache_enabled: settings
|
||||
.remove("basebackup_cache_enabled")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'basebackup_cache_enabled' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -183,6 +183,8 @@ pub struct ConfigToml {
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -308,6 +310,26 @@ pub struct TimelineImportConfig {
|
||||
pub import_job_checkpoint_threshold: NonZeroUsize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct BasebackupCacheConfig {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub cleanup_period: Duration,
|
||||
// FIXME: Support max_size_bytes.
|
||||
// pub max_size_bytes: usize,
|
||||
pub max_size_entries: i64,
|
||||
}
|
||||
|
||||
impl Default for BasebackupCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cleanup_period: Duration::from_secs(60),
|
||||
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
|
||||
max_size_entries: 1000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -491,8 +513,14 @@ pub struct TenantConfigToml {
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
|
||||
/// Capacity of relsize snapshot cache (used by replicas).
|
||||
pub relsize_snapshot_cache_capacity: usize,
|
||||
|
||||
/// Enable preparing basebackup on XLOG_CHECKPOINT_SHUTDOWN and using it in basebackup requests.
|
||||
// FIXME: Remove skip_serializing_if when the feature is stable.
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub basebackup_cache_enabled: bool,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -666,6 +694,7 @@ impl Default for ConfigToml {
|
||||
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
|
||||
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
|
||||
},
|
||||
basebackup_cache_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -791,6 +820,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
|
||||
basebackup_cache_enabled: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -632,6 +632,8 @@ pub struct TenantConfigPatch {
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub basebackup_cache_enabled: FieldPatch<bool>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -764,6 +766,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub relsize_snapshot_cache_capacity: Option<usize>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -810,6 +815,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
mut relsize_snapshot_cache_capacity,
|
||||
mut basebackup_cache_enabled,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -914,6 +920,9 @@ impl TenantConfig {
|
||||
patch
|
||||
.relsize_snapshot_cache_capacity
|
||||
.apply(&mut relsize_snapshot_cache_capacity);
|
||||
patch
|
||||
.basebackup_cache_enabled
|
||||
.apply(&mut basebackup_cache_enabled);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -954,6 +963,7 @@ impl TenantConfig {
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
relsize_snapshot_cache_capacity,
|
||||
basebackup_cache_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1065,6 +1075,9 @@ impl TenantConfig {
|
||||
relsize_snapshot_cache_capacity: self
|
||||
.relsize_snapshot_cache_capacity
|
||||
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
|
||||
basebackup_cache_enabled: self
|
||||
.basebackup_cache_enabled
|
||||
.unwrap_or(global_conf.basebackup_cache_enabled),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,6 +86,27 @@ pub struct DbError {
|
||||
}
|
||||
|
||||
impl DbError {
|
||||
pub fn new_test_error(code: SqlState, message: String) -> Self {
|
||||
DbError {
|
||||
severity: "ERROR".to_string(),
|
||||
parsed_severity: Some(Severity::Error),
|
||||
code,
|
||||
message,
|
||||
detail: None,
|
||||
hint: None,
|
||||
position: None,
|
||||
where_: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
column: None,
|
||||
datatype: None,
|
||||
constraint: None,
|
||||
file: None,
|
||||
line: None,
|
||||
routine: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> {
|
||||
let mut severity = None;
|
||||
let mut parsed_severity = None;
|
||||
|
||||
518
pageserver/src/basebackup_cache.rs
Normal file
518
pageserver/src/basebackup_cache.rs
Normal file
@@ -0,0 +1,518 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
|
||||
use tokio::{
|
||||
io::{AsyncWriteExt, BufWriter},
|
||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
shard::TenantShardId,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
basebackup::send_basebackup_tarball,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
|
||||
task_mgr::TaskKind,
|
||||
tenant::{
|
||||
Timeline,
|
||||
mgr::{TenantManager, TenantSlot},
|
||||
},
|
||||
};
|
||||
|
||||
pub struct BasebackupPrepareRequest {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
|
||||
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
|
||||
|
||||
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
|
||||
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
|
||||
|
||||
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
|
||||
///
|
||||
/// The main purpose of this cache is to speed up the startup process of compute nodes
|
||||
/// after scaling to zero.
|
||||
/// Thus, the basebackup is stored only for the latest LSN of the timeline and with
|
||||
/// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
|
||||
///
|
||||
/// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
|
||||
/// generates a basebackup from the timeline in the background, and stores it on disk.
|
||||
///
|
||||
/// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
|
||||
/// and ~1 RPS for get requests.
|
||||
pub struct BasebackupCache {
|
||||
data_dir: Utf8PathBuf,
|
||||
config: BasebackupCacheConfig,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remove_entry_sender: BasebackupRemoveEntrySender,
|
||||
|
||||
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
read_hit_count: GenericCounter<AtomicU64>,
|
||||
read_miss_count: GenericCounter<AtomicU64>,
|
||||
read_err_count: GenericCounter<AtomicU64>,
|
||||
|
||||
prepare_ok_count: GenericCounter<AtomicU64>,
|
||||
prepare_skip_count: GenericCounter<AtomicU64>,
|
||||
prepare_err_count: GenericCounter<AtomicU64>,
|
||||
}
|
||||
|
||||
impl BasebackupCache {
|
||||
/// Creates a BasebackupCache and spawns the background task.
|
||||
/// The initialization of the cache is performed in the background and does not
|
||||
/// block the caller. The cache will return `None` for any get requests until
|
||||
/// initialization is complete.
|
||||
pub fn spawn(
|
||||
runtime_handle: &tokio::runtime::Handle,
|
||||
data_dir: Utf8PathBuf,
|
||||
config: Option<BasebackupCacheConfig>,
|
||||
prepare_receiver: BasebackupPrepareReceiver,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let enabled = config.is_some();
|
||||
|
||||
let cache = Arc::new(BasebackupCache {
|
||||
data_dir,
|
||||
config: config.unwrap_or_default(),
|
||||
tenant_manager,
|
||||
remove_entry_sender,
|
||||
|
||||
entries: std::sync::Mutex::new(HashMap::new()),
|
||||
|
||||
cancel,
|
||||
|
||||
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
|
||||
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
|
||||
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
|
||||
|
||||
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
|
||||
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
|
||||
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
|
||||
});
|
||||
|
||||
if enabled {
|
||||
runtime_handle.spawn(
|
||||
cache
|
||||
.clone()
|
||||
.background(prepare_receiver, remove_entry_receiver),
|
||||
);
|
||||
}
|
||||
|
||||
cache
|
||||
}
|
||||
|
||||
/// Gets a basebackup entry from the cache.
|
||||
/// If the entry is found, opens a file with the basebackup archive and returns it.
|
||||
/// The open file descriptor will prevent the file system from deleting the file
|
||||
/// even if the entry is removed from the cache in the background.
|
||||
pub async fn get(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Option<tokio::fs::File> {
|
||||
// Fast path. Check if the entry exists using the in-memory state.
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
|
||||
self.read_miss_count.inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let path = self.entry_path(tenant_id, timeline_id, lsn);
|
||||
|
||||
match tokio::fs::File::open(path).await {
|
||||
Ok(file) => {
|
||||
self.read_hit_count.inc();
|
||||
Some(file)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// We may end up here if the basebackup was concurrently removed by the cleanup task.
|
||||
self.read_miss_count.inc();
|
||||
} else {
|
||||
self.read_err_count.inc();
|
||||
tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Private methods.
|
||||
|
||||
fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
|
||||
// The default format for LSN is 0/ABCDEF.
|
||||
// The backslash is not filename friendly, so serialize it as plain hex.
|
||||
let lsn = lsn.0;
|
||||
format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
|
||||
}
|
||||
|
||||
fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
|
||||
self.data_dir
|
||||
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
|
||||
}
|
||||
|
||||
fn entry_tmp_path(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Utf8PathBuf {
|
||||
self.data_dir
|
||||
.join("tmp")
|
||||
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
|
||||
}
|
||||
|
||||
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
|
||||
let parts: Vec<&str> = filename
|
||||
.strip_prefix("basebackup_")?
|
||||
.strip_suffix(".tar.gz")?
|
||||
.split('_')
|
||||
.collect();
|
||||
if parts.len() != 3 {
|
||||
return None;
|
||||
}
|
||||
let tenant_id = parts[0].parse::<TenantId>().ok()?;
|
||||
let timeline_id = parts[1].parse::<TimelineId>().ok()?;
|
||||
let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
|
||||
|
||||
Some((tenant_id, timeline_id, lsn))
|
||||
}
|
||||
|
||||
async fn cleanup(&self) -> anyhow::Result<()> {
|
||||
// Cleanup tmp directory.
|
||||
let tmp_dir = self.data_dir.join("tmp");
|
||||
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
|
||||
while let Some(dir_entry) = tmp_dir.next_entry().await? {
|
||||
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
|
||||
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove outdated entries.
|
||||
let entries_old = self.entries.lock().unwrap().clone();
|
||||
let mut entries_new = HashMap::new();
|
||||
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
|
||||
if !tenant_shard_id.is_shard_zero() {
|
||||
continue;
|
||||
}
|
||||
let TenantSlot::Attached(tenant) = tenant_slot else {
|
||||
continue;
|
||||
};
|
||||
let tenant_id = tenant_shard_id.tenant_id;
|
||||
|
||||
for timeline in tenant.list_timelines() {
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
|
||||
if let Some(&entry_lsn) = entries_old.get(&tti) {
|
||||
if timeline.get_last_record_lsn() <= entry_lsn {
|
||||
entries_new.insert(tti, entry_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (&tti, &lsn) in entries_old.iter() {
|
||||
if !entries_new.contains_key(&tti) {
|
||||
self.remove_entry_sender
|
||||
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
|
||||
*self.entries.lock().unwrap() = entries_new;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_startup(&self) -> anyhow::Result<()> {
|
||||
// Create data_dir and tmp directory if they do not exist.
|
||||
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to create basebackup cache data_dir {:?}: {:?}",
|
||||
self.data_dir,
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
// Read existing entries from the data_dir and add them to in-memory state.
|
||||
let mut entries = HashMap::new();
|
||||
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
|
||||
while let Some(dir_entry) = dir.next_entry().await? {
|
||||
let filename = dir_entry.file_name();
|
||||
|
||||
if filename == "tmp" {
|
||||
// Skip the tmp directory.
|
||||
continue;
|
||||
}
|
||||
|
||||
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
|
||||
let Some((tenant_id, timeline_id, lsn)) = parsed else {
|
||||
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
|
||||
continue;
|
||||
};
|
||||
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
use std::collections::hash_map::Entry::*;
|
||||
|
||||
match entries.entry(tti) {
|
||||
Occupied(mut entry) => {
|
||||
let entry_lsn = *entry.get();
|
||||
// Leave only the latest entry, remove the old one.
|
||||
if lsn < entry_lsn {
|
||||
self.remove_entry_sender.send(self.entry_path(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
))?;
|
||||
} else if lsn > entry_lsn {
|
||||
self.remove_entry_sender.send(self.entry_path(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
entry_lsn,
|
||||
))?;
|
||||
entry.insert(lsn);
|
||||
} else {
|
||||
// Two different filenames parsed to the same timline_id and LSN.
|
||||
// Should never happen.
|
||||
return Err(anyhow::anyhow!(
|
||||
"Duplicate basebackup cache entry with the same LSN: {:?}",
|
||||
filename
|
||||
));
|
||||
}
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
|
||||
*self.entries.lock().unwrap() = entries;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn background(
|
||||
self: Arc<Self>,
|
||||
mut prepare_receiver: BasebackupPrepareReceiver,
|
||||
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
|
||||
) {
|
||||
// Panic in the background is a safe fallback.
|
||||
// It will drop receivers and the cache will be effectively disabled.
|
||||
self.on_startup()
|
||||
.await
|
||||
.expect("Failed to initialize basebackup cache");
|
||||
|
||||
let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
|
||||
cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = prepare_receiver.recv() => {
|
||||
if let Err(err) = self.prepare_basebackup(
|
||||
req.tenant_shard_id,
|
||||
req.timeline_id,
|
||||
req.lsn,
|
||||
).await {
|
||||
tracing::info!("Failed to prepare basebackup: {:#}", err);
|
||||
self.prepare_err_count.inc();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Some(req) = remove_entry_receiver.recv() => {
|
||||
if let Err(e) = tokio::fs::remove_file(req).await {
|
||||
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
|
||||
}
|
||||
}
|
||||
_ = cleanup_ticker.tick() => {
|
||||
self.cleanup().await.unwrap_or_else(|e| {
|
||||
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
|
||||
});
|
||||
}
|
||||
_ = self.cancel.cancelled() => {
|
||||
tracing::info!("BasebackupCache background task cancelled");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare a basebackup for the given timeline.
|
||||
///
|
||||
/// If the basebackup already exists with a higher LSN or the timeline already
|
||||
/// has a higher last_record_lsn, skip the preparation.
|
||||
///
|
||||
/// The basebackup is prepared in a temporary directory and then moved to the final
|
||||
/// location to make the operation atomic.
|
||||
async fn prepare_basebackup(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
req_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
"Preparing basebackup for timeline",
|
||||
);
|
||||
|
||||
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
|
||||
|
||||
{
|
||||
let entries = self.entries.lock().unwrap();
|
||||
if let Some(&entry_lsn) = entries.get(&tti) {
|
||||
if entry_lsn >= req_lsn {
|
||||
tracing::info!(
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
%entry_lsn,
|
||||
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if entries.len() as i64 >= self.config.max_size_entries {
|
||||
tracing::info!(
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
"Basebackup cache is full, skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
let tenant_state = tenant.current_state();
|
||||
if tenant_state != TenantState::Active {
|
||||
anyhow::bail!(
|
||||
"Tenant {} is not active, current state: {:?}",
|
||||
tenant_shard_id.tenant_id,
|
||||
tenant_state
|
||||
)
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
if last_record_lsn > req_lsn {
|
||||
tracing::info!(
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
%last_record_lsn,
|
||||
"Timeline has a higher LSN than the requested one, skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
|
||||
|
||||
let res = self
|
||||
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
|
||||
.await;
|
||||
|
||||
if let Err(err) = res {
|
||||
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
|
||||
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
|
||||
match tokio::fs::remove_file(&entry_tmp_path).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
|
||||
}
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Move the tmp file to the final location atomically.
|
||||
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
|
||||
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
|
||||
|
||||
let mut entries = self.entries.lock().unwrap();
|
||||
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
|
||||
// Remove the old entry if it exists.
|
||||
self.remove_entry_sender
|
||||
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
|
||||
.unwrap();
|
||||
}
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
|
||||
|
||||
self.prepare_ok_count.inc();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepares a basebackup in a temporary file.
|
||||
async fn prepare_basebackup_tmp(
|
||||
&self,
|
||||
emptry_tmp_path: &Utf8Path,
|
||||
timeline: &Arc<Timeline>,
|
||||
req_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
|
||||
let ctx = ctx.with_scope_timeline(timeline);
|
||||
|
||||
let file = tokio::fs::File::create(emptry_tmp_path).await?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
|
||||
let mut encoder = GzipEncoder::with_quality(
|
||||
&mut writer,
|
||||
// Level::Best because compression is not on the hot path of basebackup requests.
|
||||
// The decompression is almost not affected by the compression level.
|
||||
async_compression::Level::Best,
|
||||
);
|
||||
|
||||
// We may receive a request before the WAL record is applied to the timeline.
|
||||
// Wait for the requested LSN to be applied.
|
||||
timeline
|
||||
.wait_lsn(
|
||||
req_lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
|
||||
crate::tenant::timeline::WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
send_basebackup_tarball(
|
||||
&mut encoder,
|
||||
timeline,
|
||||
Some(req_lsn),
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
encoder.shutdown().await?;
|
||||
writer.flush().await?;
|
||||
writer.into_inner().sync_all().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
|
||||
use metrics::set_build_info_metric;
|
||||
use nix::sys::socket::{setsockopt, sockopt};
|
||||
use pageserver::basebackup_cache::BasebackupCache;
|
||||
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
|
||||
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
@@ -541,6 +542,8 @@ fn start_pageserver(
|
||||
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
let (basebackup_prepare_sender, basebackup_prepare_receiver) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let deletion_queue_client = deletion_queue.new_client();
|
||||
let background_purges = mgr::BackgroundPurges::default();
|
||||
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
@@ -551,12 +554,22 @@ fn start_pageserver(
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
))?;
|
||||
let tenant_manager = Arc::new(tenant_manager);
|
||||
|
||||
let basebackup_cache = BasebackupCache::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
conf.basebackup_cache_dir(),
|
||||
conf.basebackup_cache_config.clone(),
|
||||
basebackup_prepare_receiver,
|
||||
Arc::clone(&tenant_manager),
|
||||
shutdown_pageserver.child_token(),
|
||||
);
|
||||
|
||||
BACKGROUND_RUNTIME.spawn({
|
||||
let shutdown_pageserver = shutdown_pageserver.clone();
|
||||
let drive_init = async move {
|
||||
@@ -763,6 +776,7 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
@@ -232,6 +232,8 @@ pub struct PageServerConf {
|
||||
pub dev_mode: bool,
|
||||
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
|
||||
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -261,6 +263,10 @@ impl PageServerConf {
|
||||
self.workdir.join("metadata.json")
|
||||
}
|
||||
|
||||
pub fn basebackup_cache_dir(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("basebackup_cache")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
|
||||
// Encode a version in the filename, so that if we ever switch away from JSON we can
|
||||
// increment this.
|
||||
@@ -407,6 +413,7 @@ impl PageServerConf {
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -461,6 +468,7 @@ impl PageServerConf {
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -544,6 +552,23 @@ impl PageServerConf {
|
||||
ratio.numerator, ratio.denominator
|
||||
)
|
||||
);
|
||||
|
||||
let url = Url::parse(&tracing_config.export_config.endpoint)
|
||||
.map_err(anyhow::Error::msg)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"tracing endpoint URL is invalid : {}",
|
||||
tracing_config.export_config.endpoint
|
||||
)
|
||||
})?;
|
||||
|
||||
ensure!(
|
||||
url.scheme() == "http" || url.scheme() == "https",
|
||||
format!(
|
||||
"tracing endpoint URL must start with http:// or https://: {}",
|
||||
tracing_config.export_config.endpoint
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
|
||||
@@ -660,4 +685,25 @@ mod tests {
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
|
||||
.expect("parse_and_validate");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_tracing_endpoint_is_invalid() {
|
||||
let input = r#"
|
||||
control_plane_api = "http://localhost:6666"
|
||||
|
||||
[tracing]
|
||||
|
||||
sampling_ratio = { numerator = 1, denominator = 0 }
|
||||
|
||||
[tracing.export_config]
|
||||
endpoint = "localhost:4317"
|
||||
protocol = "http-binary"
|
||||
timeout = "1ms"
|
||||
"#;
|
||||
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
|
||||
.expect("config has valid fields");
|
||||
let workdir = Utf8PathBuf::from("/nonexistent");
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
|
||||
.expect_err("parse_and_validate should fail for endpoint without scheme");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,12 +18,25 @@ use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
// management.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
pub(super) enum Name {
|
||||
/// Timeline last_record_lsn, absolute
|
||||
/// Timeline last_record_lsn, absolute.
|
||||
#[serde(rename = "written_size")]
|
||||
WrittenSize,
|
||||
/// Timeline last_record_lsn, incremental
|
||||
#[serde(rename = "written_data_bytes_delta")]
|
||||
WrittenSizeDelta,
|
||||
/// Written bytes only on this timeline (not including ancestors):
|
||||
/// written_size - ancestor_lsn
|
||||
///
|
||||
/// On the root branch, this is equivalent to `written_size`.
|
||||
#[serde(rename = "written_size_since_parent")]
|
||||
WrittenSizeSinceParent,
|
||||
/// PITR history size only on this timeline (not including ancestors):
|
||||
/// last_record_lsn - max(pitr_cutoff, ancestor_lsn).
|
||||
///
|
||||
/// On the root branch, this is its entire PITR history size. Not emitted if GC hasn't computed
|
||||
/// the PITR cutoff yet. 0 if PITR is disabled.
|
||||
#[serde(rename = "pitr_history_size_since_parent")]
|
||||
PitrHistorySizeSinceParent,
|
||||
/// Timeline logical size
|
||||
#[serde(rename = "timeline_logical_size")]
|
||||
LogicalSize,
|
||||
@@ -157,6 +170,32 @@ impl MetricsKey {
|
||||
.incremental_values()
|
||||
}
|
||||
|
||||
/// `written_size` - `ancestor_lsn`.
|
||||
const fn written_size_since_parent(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> AbsoluteValueFactory {
|
||||
MetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: Name::WrittenSizeSinceParent,
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
/// `written_size` - max(`pitr_cutoff`, `ancestor_lsn`).
|
||||
const fn pitr_history_size_since_parent(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> AbsoluteValueFactory {
|
||||
MetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: Name::PitrHistorySizeSinceParent,
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
/// Exact [`Timeline::get_current_logical_size`].
|
||||
///
|
||||
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
|
||||
@@ -334,7 +373,13 @@ impl TenantSnapshot {
|
||||
struct TimelineSnapshot {
|
||||
loaded_at: (Lsn, SystemTime),
|
||||
last_record_lsn: Lsn,
|
||||
ancestor_lsn: Lsn,
|
||||
current_exact_logical_size: Option<u64>,
|
||||
/// Whether PITR is enabled (pitr_interval > 0).
|
||||
pitr_enabled: bool,
|
||||
/// The PITR cutoff LSN. None if not yet initialized. If PITR is disabled, this is approximately
|
||||
/// Some(last_record_lsn), but may lag behind it since it's computed periodically.
|
||||
pitr_cutoff: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl TimelineSnapshot {
|
||||
@@ -354,6 +399,9 @@ impl TimelineSnapshot {
|
||||
} else {
|
||||
let loaded_at = t.loaded_at;
|
||||
let last_record_lsn = t.get_last_record_lsn();
|
||||
let ancestor_lsn = t.get_ancestor_lsn();
|
||||
let pitr_enabled = !t.get_pitr_interval().is_zero();
|
||||
let pitr_cutoff = t.gc_info.read().unwrap().cutoffs.time;
|
||||
|
||||
let current_exact_logical_size = {
|
||||
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
|
||||
@@ -373,7 +421,10 @@ impl TimelineSnapshot {
|
||||
Ok(Some(TimelineSnapshot {
|
||||
loaded_at,
|
||||
last_record_lsn,
|
||||
ancestor_lsn,
|
||||
current_exact_logical_size,
|
||||
pitr_enabled,
|
||||
pitr_cutoff,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -424,6 +475,8 @@ impl TimelineSnapshot {
|
||||
|
||||
let up_to = now;
|
||||
|
||||
let written_size_last = written_size_now.value.max(prev.1); // don't regress
|
||||
|
||||
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
@@ -441,6 +494,27 @@ impl TimelineSnapshot {
|
||||
});
|
||||
}
|
||||
|
||||
// Compute the branch-local written size.
|
||||
let written_size_since_parent_key =
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id);
|
||||
metrics.push(
|
||||
written_size_since_parent_key
|
||||
.at(now, written_size_last.saturating_sub(self.ancestor_lsn.0)),
|
||||
);
|
||||
|
||||
// Compute the branch-local PITR history size. Not emitted if GC hasn't yet computed the
|
||||
// PITR cutoff. 0 if PITR is disabled.
|
||||
let pitr_history_size_since_parent_key =
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id);
|
||||
if !self.pitr_enabled {
|
||||
metrics.push(pitr_history_size_since_parent_key.at(now, 0));
|
||||
} else if let Some(pitr_cutoff) = self.pitr_cutoff {
|
||||
metrics.push(pitr_history_size_since_parent_key.at(
|
||||
now,
|
||||
written_size_last.saturating_sub(pitr_cutoff.max(self.ancestor_lsn).0),
|
||||
));
|
||||
}
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
|
||||
@@ -12,12 +12,17 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
let cache = HashMap::new();
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
@@ -33,7 +38,11 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -49,7 +58,9 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let before = DateTime::<Utc>::from(before);
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
|
||||
@@ -59,7 +70,10 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -69,7 +83,11 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -86,7 +104,9 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let before = DateTime::<Utc>::from(before);
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
let logical_size = 0x42000;
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
@@ -103,7 +123,10 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: Some(logical_size),
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -113,16 +136,18 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that written sizes do not regress across restarts.
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
|
||||
// should never go backwards
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
@@ -140,7 +165,10 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(Lsn(20)),
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
@@ -169,6 +197,8 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 100),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 80),
|
||||
]
|
||||
);
|
||||
|
||||
@@ -183,6 +213,157 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 100),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 80),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that written sizes do not regress across restarts, even on child branches.
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let [later, now, at_restart] = time_backwards();
|
||||
|
||||
// FIXME: tests would be so much easier if we did not need to juggle back and forth
|
||||
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
|
||||
let now = DateTime::<Utc>::from(now);
|
||||
let later = DateTime::<Utc>::from(later);
|
||||
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
|
||||
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
|
||||
let before_restart = DateTime::<Utc>::from(before_restart);
|
||||
let way_before = DateTime::<Utc>::from(way_before);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
ancestor_lsn: Lsn(40),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(Lsn(20)),
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
before_restart,
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 60),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
|
||||
]
|
||||
);
|
||||
|
||||
// now if we cache these metrics, and re-run while "still in recovery"
|
||||
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
|
||||
|
||||
// "still in recovery", because our snapshot did not change
|
||||
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 60),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that written sizes do not regress across restarts, even on child branches and
|
||||
/// with a PITR cutoff after the branch point.
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn_and_pitr_cutoff() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let [later, now, at_restart] = time_backwards();
|
||||
|
||||
// FIXME: tests would be so much easier if we did not need to juggle back and forth
|
||||
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
|
||||
let now = DateTime::<Utc>::from(now);
|
||||
let later = DateTime::<Utc>::from(later);
|
||||
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
|
||||
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
|
||||
let before_restart = DateTime::<Utc>::from(before_restart);
|
||||
let way_before = DateTime::<Utc>::from(way_before);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
ancestor_lsn: Lsn(30),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: Some(Lsn(40)),
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
before_restart,
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 70),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
|
||||
]
|
||||
);
|
||||
|
||||
// now if we cache these metrics, and re-run while "still in recovery"
|
||||
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
|
||||
|
||||
// "still in recovery", because our snapshot did not change
|
||||
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 70),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -201,7 +382,10 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: None,
|
||||
};
|
||||
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
@@ -286,16 +470,101 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
times
|
||||
}
|
||||
|
||||
/// Tests that disabled PITR history does not yield any history size, even when the PITR cutoff
|
||||
/// indicates otherwise.
|
||||
#[test]
|
||||
fn pitr_disabled_yields_no_history_size() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::new();
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let pitr_cutoff = Lsn(0x11000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: false,
|
||||
pitr_cutoff: Some(pitr_cutoff),
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
snap.loaded_at.1.into(),
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that uninitialized PITR cutoff does not emit any history size metric at all.
|
||||
#[test]
|
||||
fn pitr_uninitialized_does_not_emit_history_size() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::new();
|
||||
|
||||
let initdb_lsn = Lsn(0x10000);
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
ancestor_lsn: Lsn(0),
|
||||
current_exact_logical_size: None,
|
||||
pitr_enabled: true,
|
||||
pitr_cutoff: None,
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
snap.loaded_at.1.into(),
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
|
||||
.at(now, disk_consistent_lsn.0),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples_old(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 5] {
|
||||
) -> [RawMetric; 7] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until_old_format(before, now, 0),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
|
||||
@@ -307,10 +576,12 @@ pub(crate) const fn metric_examples(
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [NewRawMetric; 5] {
|
||||
) -> [NewRawMetric; 7] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
|
||||
@@ -513,6 +513,14 @@ mod tests {
|
||||
line!(),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_history_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
@@ -560,7 +568,7 @@ mod tests {
|
||||
assert_eq!(upgraded_samples, new_samples);
|
||||
}
|
||||
|
||||
fn metric_samples_old() -> [RawMetric; 5] {
|
||||
fn metric_samples_old() -> [RawMetric; 7] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
@@ -572,7 +580,7 @@ mod tests {
|
||||
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
|
||||
fn metric_samples() -> [NewRawMetric; 5] {
|
||||
fn metric_samples() -> [NewRawMetric; 7] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
|
||||
@@ -449,7 +449,7 @@ async fn build_timeline_info_common(
|
||||
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
timeline.get_gc_cutoff_lsn().unwrap_or_default(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
);
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
mod auth;
|
||||
pub mod basebackup;
|
||||
pub mod basebackup_cache;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
|
||||
@@ -1066,6 +1066,15 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
|
||||
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
|
||||
});
|
||||
|
||||
pub(crate) static TENANT_OFFLOADED_TIMELINES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_offloaded_timelines",
|
||||
"Number of offloaded timelines of a tenant",
|
||||
&["tenant_id", "shard_id"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_offloaded_timelines metric")
|
||||
});
|
||||
|
||||
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_eviction_iteration_duration_seconds_global",
|
||||
@@ -3551,11 +3560,14 @@ impl TimelineMetrics {
|
||||
}
|
||||
|
||||
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
let tid = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = tenant_shard_id.shard_slug().to_string();
|
||||
|
||||
// Only shard zero deals in synthetic sizes
|
||||
if tenant_shard_id.is_shard_zero() {
|
||||
let tid = tenant_shard_id.tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
}
|
||||
let _ = TENANT_OFFLOADED_TIMELINES.remove_label_values(&[&tid, &shard_id]);
|
||||
|
||||
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
|
||||
|
||||
@@ -4347,6 +4359,42 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
.set(u64::try_from(num_threads.get()).unwrap());
|
||||
}
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_READ: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_basebackup_cache_read_total",
|
||||
"Number of read accesses to the basebackup cache grouped by hit/miss/error",
|
||||
&["result"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_basebackup_cache_prepare_total",
|
||||
"Number of prepare requests processed by the basebackup cache grouped by ok/skip/error",
|
||||
&["result"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_basebackup_cache_entries_total",
|
||||
"Number of entries in the basebackup cache"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// FIXME: Support basebackup cache size metrics.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_basebackup_cache_size_bytes",
|
||||
"Total size of all basebackup cache entries on disk in bytes"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_config_ignored_items",
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{io, str};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
@@ -52,8 +51,10 @@ use utils::simple_rcu::RcuReadGuard;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
@@ -107,6 +108,7 @@ pub fn spawn(
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
let libpq_ctx = RequestContext::todo_child(
|
||||
@@ -128,6 +130,7 @@ pub fn spawn(
|
||||
conf.pg_auth_type,
|
||||
tls_config,
|
||||
conf.page_service_pipelining.clone(),
|
||||
basebackup_cache,
|
||||
libpq_ctx,
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -186,6 +189,7 @@ pub async fn libpq_listener_main(
|
||||
auth_type: AuthType,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
listener_ctx: RequestContext,
|
||||
listener_cancel: CancellationToken,
|
||||
) -> Connections {
|
||||
@@ -229,6 +233,7 @@ pub async fn libpq_listener_main(
|
||||
auth_type,
|
||||
tls_config.clone(),
|
||||
pipelining_config.clone(),
|
||||
Arc::clone(&basebackup_cache),
|
||||
connection_ctx,
|
||||
connections_cancel.child_token(),
|
||||
gate_guard,
|
||||
@@ -271,6 +276,7 @@ async fn page_service_conn_main(
|
||||
auth_type: AuthType,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -336,6 +342,7 @@ async fn page_service_conn_main(
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
basebackup_cache,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
@@ -390,6 +397,8 @@ struct PageServerHandler {
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
|
||||
@@ -849,6 +858,7 @@ impl PageServerHandler {
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -862,6 +872,7 @@ impl PageServerHandler {
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
basebackup_cache,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -2493,6 +2504,8 @@ impl PageServerHandler {
|
||||
.map_err(QueryError::Disconnected)?;
|
||||
self.flush_cancellable(pgb, &self.cancel).await?;
|
||||
|
||||
let mut from_cache = false;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
// fullbackup. TODO Compress in that case too (tests need to be updated)
|
||||
if full_backup {
|
||||
@@ -2510,7 +2523,33 @@ impl PageServerHandler {
|
||||
.map_err(map_basebackup_error)?;
|
||||
} else {
|
||||
let mut writer = BufWriter::new(pgb.copyout_writer());
|
||||
if gzip {
|
||||
|
||||
let cached = {
|
||||
// Basebackup is cached only for this combination of parameters.
|
||||
if timeline.is_basebackup_cache_enabled()
|
||||
&& gzip
|
||||
&& lsn.is_some()
|
||||
&& prev_lsn.is_none()
|
||||
{
|
||||
self.basebackup_cache
|
||||
.get(tenant_id, timeline_id, lsn.unwrap())
|
||||
.await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(mut cached) = cached {
|
||||
from_cache = true;
|
||||
tokio::io::copy(&mut cached, &mut writer)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
map_basebackup_error(BasebackupError::Client(
|
||||
e,
|
||||
"handle_basebackup_request,cached,copy",
|
||||
))
|
||||
})?;
|
||||
} else if gzip {
|
||||
let mut encoder = GzipEncoder::with_quality(
|
||||
&mut writer,
|
||||
// NOTE using fast compression because it's on the critical path
|
||||
@@ -2569,6 +2608,7 @@ impl PageServerHandler {
|
||||
info!(
|
||||
lsn_await_millis = lsn_awaited_after.as_millis(),
|
||||
basebackup_millis = basebackup_after.as_millis(),
|
||||
%from_cache,
|
||||
"basebackup complete"
|
||||
);
|
||||
|
||||
|
||||
@@ -380,6 +380,10 @@ pub enum TaskKind {
|
||||
DetachAncestor,
|
||||
|
||||
ImportPgdata,
|
||||
|
||||
/// Background task of [`crate::basebackup_cache::BasebackupCache`].
|
||||
/// Prepares basebackups and clears outdated entries.
|
||||
BasebackupCache,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -78,6 +78,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit
|
||||
use self::timeline::{
|
||||
EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError,
|
||||
};
|
||||
use crate::basebackup_cache::BasebackupPrepareSender;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
@@ -86,8 +87,8 @@ use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_STATE_METRIC,
|
||||
TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
|
||||
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
|
||||
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::LocationMode;
|
||||
@@ -157,6 +158,7 @@ pub struct TenantSharedResources {
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
}
|
||||
|
||||
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
|
||||
@@ -317,12 +319,15 @@ pub struct TenantShard {
|
||||
gc_cs: tokio::sync::Mutex<()>,
|
||||
walredo_mgr: Option<Arc<WalRedoManager>>,
|
||||
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
/// Provides access to timeline data sitting in the remote storage.
|
||||
pub(crate) remote_storage: GenericRemoteStorage,
|
||||
|
||||
// Access to global deletion queue for when this tenant wants to schedule a deletion
|
||||
/// Access to global deletion queue for when this tenant wants to schedule a deletion.
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
|
||||
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
@@ -1286,6 +1291,7 @@ impl TenantShard {
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
} = resources;
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
@@ -1301,6 +1307,7 @@ impl TenantShard {
|
||||
remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
));
|
||||
|
||||
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
|
||||
@@ -3348,6 +3355,13 @@ impl TenantShard {
|
||||
activated_timelines += 1;
|
||||
}
|
||||
|
||||
let tid = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = self.tenant_shard_id.shard_slug().to_string();
|
||||
let offloaded_timeline_count = timelines_offloaded_accessor.len();
|
||||
TENANT_OFFLOADED_TIMELINES
|
||||
.with_label_values(&[&tid, &shard_id])
|
||||
.set(offloaded_timeline_count as u64);
|
||||
|
||||
self.state.send_modify(move |current_state| {
|
||||
assert!(
|
||||
matches!(current_state, TenantState::Activating(_)),
|
||||
@@ -4232,6 +4246,7 @@ impl TenantShard {
|
||||
remote_storage: GenericRemoteStorage,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
) -> TenantShard {
|
||||
assert!(!attached_conf.location.generation.is_none());
|
||||
|
||||
@@ -4335,6 +4350,7 @@ impl TenantShard {
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4587,7 +4603,7 @@ impl TenantShard {
|
||||
|
||||
target.cutoffs = GcCutoffs {
|
||||
space: space_cutoff,
|
||||
time: Lsn::INVALID,
|
||||
time: None,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -4671,7 +4687,7 @@ impl TenantShard {
|
||||
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
|
||||
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
|
||||
target.within_ancestor_pitr =
|
||||
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.time;
|
||||
Some(timeline.get_ancestor_lsn()) >= ancestor_gc_cutoffs.time;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4684,13 +4700,15 @@ impl TenantShard {
|
||||
} else {
|
||||
0
|
||||
});
|
||||
timeline.metrics.pitr_history_size.set(
|
||||
timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(target.cutoffs.time)
|
||||
.unwrap_or(Lsn(0))
|
||||
.0,
|
||||
);
|
||||
if let Some(time_cutoff) = target.cutoffs.time {
|
||||
timeline.metrics.pitr_history_size.set(
|
||||
timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(time_cutoff)
|
||||
.unwrap_or_default()
|
||||
.0,
|
||||
);
|
||||
}
|
||||
|
||||
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
@@ -4699,8 +4717,8 @@ impl TenantShard {
|
||||
let original_cutoffs = target.cutoffs.clone();
|
||||
// GC cutoffs should never go back
|
||||
target.cutoffs = GcCutoffs {
|
||||
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
|
||||
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
|
||||
space: cutoffs.space.max(original_cutoffs.space),
|
||||
time: cutoffs.time.max(original_cutoffs.time),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5252,6 +5270,7 @@ impl TenantShard {
|
||||
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
|
||||
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5560,6 +5579,14 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
let tid = self.tenant_shard_id.to_string();
|
||||
let shard_id = self.tenant_shard_id.shard_slug().to_string();
|
||||
let set_key = &[tid.as_str(), shard_id.as_str()][..];
|
||||
TENANT_OFFLOADED_TIMELINES
|
||||
.with_label_values(set_key)
|
||||
.set(manifest.offloaded_timelines.len() as u64);
|
||||
|
||||
// Upload the manifest. Remote storage does no retries internally, so retry here.
|
||||
match backoff::retry(
|
||||
|| async {
|
||||
@@ -5826,6 +5853,8 @@ pub(crate) mod harness {
|
||||
) -> anyhow::Result<Arc<TenantShard>> {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
|
||||
let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let tenant = Arc::new(TenantShard::new(
|
||||
TenantState::Attaching,
|
||||
self.conf,
|
||||
@@ -5843,6 +5872,7 @@ pub(crate) mod harness {
|
||||
self.deletion_queue.new_client(),
|
||||
// TODO: ideally we should run all unit tests with both configs
|
||||
L0FlushGlobalState::new(L0FlushConfig::default()),
|
||||
basebackup_requst_sender,
|
||||
));
|
||||
|
||||
let preload = tenant
|
||||
@@ -8937,7 +8967,7 @@ mod tests {
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x30);
|
||||
guard.cutoffs.time = Some(Lsn(0x30));
|
||||
guard.cutoffs.space = Lsn(0x30);
|
||||
}
|
||||
|
||||
@@ -9045,7 +9075,7 @@ mod tests {
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
guard.cutoffs.time = Some(Lsn(0x40));
|
||||
guard.cutoffs.space = Lsn(0x40);
|
||||
}
|
||||
tline
|
||||
@@ -9463,7 +9493,7 @@ mod tests {
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -9547,7 +9577,7 @@ mod tests {
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
guard.cutoffs.time = Some(Lsn(0x40));
|
||||
guard.cutoffs.space = Lsn(0x40);
|
||||
}
|
||||
tline
|
||||
@@ -10018,7 +10048,7 @@ mod tests {
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -10081,7 +10111,7 @@ mod tests {
|
||||
let verify_result = || async {
|
||||
let gc_horizon = {
|
||||
let gc_info = tline.gc_info.read().unwrap();
|
||||
gc_info.cutoffs.time
|
||||
gc_info.cutoffs.time.unwrap_or_default()
|
||||
};
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
@@ -10159,7 +10189,7 @@ mod tests {
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x38);
|
||||
guard.cutoffs.time = Some(Lsn(0x38));
|
||||
guard.cutoffs.space = Lsn(0x38);
|
||||
}
|
||||
tline
|
||||
@@ -10267,7 +10297,7 @@ mod tests {
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -10330,7 +10360,7 @@ mod tests {
|
||||
let verify_result = || async {
|
||||
let gc_horizon = {
|
||||
let gc_info = tline.gc_info.read().unwrap();
|
||||
gc_info.cutoffs.time
|
||||
gc_info.cutoffs.time.unwrap_or_default()
|
||||
};
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
@@ -10516,7 +10546,7 @@ mod tests {
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x10),
|
||||
time: Some(Lsn(0x10)),
|
||||
space: Lsn(0x10),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -10536,7 +10566,7 @@ mod tests {
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x50),
|
||||
time: Some(Lsn(0x50)),
|
||||
space: Lsn(0x50),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -11257,7 +11287,7 @@ mod tests {
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -11646,7 +11676,7 @@ mod tests {
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -11709,7 +11739,7 @@ mod tests {
|
||||
let verify_result = || async {
|
||||
let gc_horizon = {
|
||||
let gc_info = tline.gc_info.read().unwrap();
|
||||
gc_info.cutoffs.time
|
||||
gc_info.cutoffs.time.unwrap_or_default()
|
||||
};
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
@@ -11898,7 +11928,7 @@ mod tests {
|
||||
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
@@ -11961,7 +11991,7 @@ mod tests {
|
||||
let verify_result = || async {
|
||||
let gc_horizon = {
|
||||
let gc_info = tline.gc_info.read().unwrap();
|
||||
gc_info.cutoffs.time
|
||||
gc_info.cutoffs.time.unwrap_or_default()
|
||||
};
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
@@ -12224,7 +12254,7 @@ mod tests {
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
time: Some(Lsn(0x30)),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
|
||||
@@ -235,7 +235,7 @@ pub(super) async fn gather_inputs(
|
||||
// than our internal space cutoff. This means that if someone drops a database and waits for their
|
||||
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
|
||||
// the space cutoff.
|
||||
let mut next_pitr_cutoff = gc_info.cutoffs.time;
|
||||
let mut next_pitr_cutoff = gc_info.cutoffs.time.unwrap_or_default(); // TODO: handle None
|
||||
|
||||
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
|
||||
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
|
||||
|
||||
@@ -63,7 +63,28 @@ pub struct InMemoryLayer {
|
||||
|
||||
opened_at: Instant,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once.
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The [`IndexEntry`] is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
///
|
||||
/// We use a separate lock for the index to reduce the critical section
|
||||
/// during which reads cannot be planned.
|
||||
///
|
||||
/// If you need access to both the index and the underlying file at the same time,
|
||||
/// respect the following locking order to avoid deadlocks:
|
||||
/// 1. [`InMemoryLayer::inner`]
|
||||
/// 2. [`InMemoryLayer::index`]
|
||||
///
|
||||
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
|
||||
/// so it is not necessary to hold simultaneous locks on index.
|
||||
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
|
||||
/// In particular:
|
||||
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
|
||||
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
|
||||
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once,
|
||||
/// and `index` (see rationale there).
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
|
||||
@@ -81,11 +102,6 @@ impl std::fmt::Debug for InMemoryLayer {
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The [`IndexEntry`] is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
|
||||
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
@@ -105,7 +121,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
|
||||
trailing_ones
|
||||
};
|
||||
|
||||
/// See [`InMemoryLayerInner::index`].
|
||||
/// See [`InMemoryLayer::index`].
|
||||
///
|
||||
/// For memory efficiency, the data is packed into a u64.
|
||||
///
|
||||
@@ -425,7 +441,7 @@ impl InMemoryLayer {
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.attached_child();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
let index = self.index.read().await;
|
||||
|
||||
struct ValueRead {
|
||||
entry_lsn: Lsn,
|
||||
@@ -435,10 +451,7 @@ impl InMemoryLayer {
|
||||
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
.index
|
||||
.range(range.start.to_compact()..range.end.to_compact())
|
||||
{
|
||||
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
|
||||
let key = Key::from_compact(*key);
|
||||
let slice = vec_map.slice_range(lsn_range.clone());
|
||||
|
||||
@@ -466,7 +479,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
|
||||
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
|
||||
let read_from = Arc::clone(self);
|
||||
let read_ctx = ctx.attached_child();
|
||||
reconstruct_state
|
||||
@@ -573,8 +586,8 @@ impl InMemoryLayer {
|
||||
start_lsn,
|
||||
end_lsn: OnceLock::new(),
|
||||
opened_at: Instant::now(),
|
||||
index: RwLock::new(BTreeMap::new()),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: BTreeMap::new(),
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
@@ -592,31 +605,39 @@ impl InMemoryLayer {
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
let (base_offset, metadata) = {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
let base_offset = inner.file.len();
|
||||
let base_offset = inner.file.len();
|
||||
|
||||
let SerializedValueBatch {
|
||||
raw,
|
||||
metadata,
|
||||
max_lsn: _,
|
||||
len: _,
|
||||
} = serialized_batch;
|
||||
let SerializedValueBatch {
|
||||
raw,
|
||||
metadata,
|
||||
max_lsn: _,
|
||||
len: _,
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
// also IndexEntry and higher levels in
|
||||
//the code don't allow the file to grow that large
|
||||
.unwrap();
|
||||
assert_eq!(new_size, expected_new_len);
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
// also IndexEntry and higher levels in
|
||||
//the code don't allow the file to grow that large
|
||||
.unwrap();
|
||||
assert_eq!(new_size, expected_new_len);
|
||||
|
||||
inner.resource_units.maybe_publish_size(new_size);
|
||||
|
||||
(base_offset, metadata)
|
||||
};
|
||||
|
||||
// Update the index with the new entries
|
||||
let mut index = self.index.write().await;
|
||||
|
||||
for meta in metadata {
|
||||
let SerializedValueMeta {
|
||||
key,
|
||||
@@ -639,7 +660,7 @@ impl InMemoryLayer {
|
||||
will_init,
|
||||
})?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let vec_map = index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
if old.is_some() {
|
||||
// This should not break anything, but is unexpected: ingestion code aims to filter out
|
||||
@@ -658,8 +679,6 @@ impl InMemoryLayer {
|
||||
);
|
||||
}
|
||||
|
||||
inner.resource_units.maybe_publish_size(new_size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -680,6 +699,18 @@ impl InMemoryLayer {
|
||||
|
||||
/// Records the end_lsn for non-dropped layers.
|
||||
/// `end_lsn` is exclusive
|
||||
///
|
||||
/// A note on locking:
|
||||
/// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
|
||||
/// writes while freezing the layer. This is enforced at a higher level via
|
||||
/// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
|
||||
/// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
|
||||
/// Timeline::write_lock for its lifetime. The rolling is handled in
|
||||
/// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
|
||||
/// so can't be called from different threads.
|
||||
/// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
|
||||
/// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
|
||||
/// hence there can be no concurrent writes
|
||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||
assert!(
|
||||
self.start_lsn < end_lsn,
|
||||
@@ -700,8 +731,8 @@ impl InMemoryLayer {
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let inner = self.inner.write().await;
|
||||
for vec_map in inner.index.values() {
|
||||
let index = self.index.read().await;
|
||||
for vec_map in index.values() {
|
||||
for (lsn, _) in vec_map.as_slice() {
|
||||
assert!(*lsn < end_lsn);
|
||||
}
|
||||
@@ -724,14 +755,11 @@ impl InMemoryLayer {
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
// though: another thread might have grabbed a reference to this layer
|
||||
// in `get_layer_for_write' just before the checkpointer called
|
||||
// `freeze`, and then `write_to_disk` on it. When the thread gets the
|
||||
// lock, it will see that it's not writeable anymore and retry, but it
|
||||
// would have to wait until we release it. That race condition is very
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
// write lock on it, so we shouldn't block anyone. See the comment on
|
||||
// [`InMemoryLayer::freeze`] to understand how locking between the append path
|
||||
// and layer flushing works.
|
||||
let inner = self.inner.read().await;
|
||||
let index = self.index.read().await;
|
||||
|
||||
use l0_flush::Inner;
|
||||
let _concurrency_permit = match l0_flush_global_state {
|
||||
@@ -743,13 +771,9 @@ impl InMemoryLayer {
|
||||
let key_count = if let Some(key_range) = key_range {
|
||||
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
|
||||
|
||||
inner
|
||||
.index
|
||||
.iter()
|
||||
.filter(|(k, _)| key_range.contains(k))
|
||||
.count()
|
||||
index.iter().filter(|(k, _)| key_range.contains(k)).count()
|
||||
} else {
|
||||
inner.index.len()
|
||||
index.len()
|
||||
};
|
||||
if key_count == 0 {
|
||||
return Ok(None);
|
||||
@@ -772,7 +796,7 @@ impl InMemoryLayer {
|
||||
let file_contents = inner.file.load_to_io_buf(ctx).await?;
|
||||
let file_contents = file_contents.freeze();
|
||||
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
for (key, vec_map) in index.iter() {
|
||||
// Write all page versions
|
||||
for (lsn, entry) in vec_map
|
||||
.as_slice()
|
||||
|
||||
@@ -24,8 +24,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -94,10 +92,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
|
||||
use super::tasks::log_compaction_error;
|
||||
use super::upload_queue::NotInitialized;
|
||||
use super::{
|
||||
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
|
||||
AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
};
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::basebackup_cache::BasebackupPrepareRequest;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
@@ -131,6 +131,7 @@ use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use crate::walingest::WalLagCooldown;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use crate::{ZERO_PAGE, task_mgr, walredo};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -196,6 +197,7 @@ pub struct TimelineResources {
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -439,6 +441,9 @@ pub struct Timeline {
|
||||
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore,
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -529,29 +534,24 @@ impl GcInfo {
|
||||
/// The `GcInfo` component describing which Lsns need to be retained. Functionally, this
|
||||
/// is a single number (the oldest LSN which we must retain), but it internally distinguishes
|
||||
/// between time-based and space-based retention for observability and consumption metrics purposes.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub(crate) struct GcCutoffs {
|
||||
/// Calculated from the [`pageserver_api::models::TenantConfig::gc_horizon`], this LSN indicates how much
|
||||
/// history we must keep to retain a specified number of bytes of WAL.
|
||||
pub(crate) space: Lsn,
|
||||
|
||||
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates how much
|
||||
/// history we must keep to enable reading back at least the PITR interval duration.
|
||||
pub(crate) time: Lsn,
|
||||
}
|
||||
|
||||
impl Default for GcCutoffs {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
space: Lsn::INVALID,
|
||||
time: Lsn::INVALID,
|
||||
}
|
||||
}
|
||||
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates
|
||||
/// how much history we must keep to enable reading back at least the PITR interval duration.
|
||||
///
|
||||
/// None indicates that the PITR cutoff has not been computed. A PITR interval of 0 will yield
|
||||
/// Some(last_record_lsn).
|
||||
pub(crate) time: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl GcCutoffs {
|
||||
fn select_min(&self) -> Lsn {
|
||||
std::cmp::min(self.space, self.time)
|
||||
// NB: if we haven't computed the PITR cutoff yet, we can't GC anything.
|
||||
self.space.min(self.time.unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1033,6 +1033,7 @@ pub(crate) enum WaitLsnWaiter<'a> {
|
||||
Tenant,
|
||||
PageService,
|
||||
HttpEndpoint,
|
||||
BaseBackupCache,
|
||||
}
|
||||
|
||||
/// Argument to [`Timeline::shutdown`].
|
||||
@@ -1088,11 +1089,14 @@ impl Timeline {
|
||||
/// Get the bytes written since the PITR cutoff on this branch, and
|
||||
/// whether this branch's ancestor_lsn is within its parent's PITR.
|
||||
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
|
||||
// TODO: for backwards compatibility, we return the full history back to 0 when the PITR
|
||||
// cutoff has not yet been initialized. This should return None instead, but this is exposed
|
||||
// in external HTTP APIs and callers may not handle a null value.
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let history = self
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(gc_info.cutoffs.time)
|
||||
.unwrap_or(Lsn(0))
|
||||
.checked_sub(gc_info.cutoffs.time.unwrap_or_default())
|
||||
.unwrap_or_default()
|
||||
.0;
|
||||
(history, gc_info.within_ancestor_pitr)
|
||||
}
|
||||
@@ -1102,9 +1106,10 @@ impl Timeline {
|
||||
self.applied_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Read timeline's planned GC cutoff: this is the logical end of history that users
|
||||
/// are allowed to read (based on configured PITR), even if physically we have more history.
|
||||
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
|
||||
/// Read timeline's planned GC cutoff: this is the logical end of history that users are allowed
|
||||
/// to read (based on configured PITR), even if physically we have more history. Returns None
|
||||
/// if the PITR cutoff has not yet been initialized.
|
||||
pub(crate) fn get_gc_cutoff_lsn(&self) -> Option<Lsn> {
|
||||
self.gc_info.read().unwrap().cutoffs.time
|
||||
}
|
||||
|
||||
@@ -1555,7 +1560,8 @@ impl Timeline {
|
||||
}
|
||||
WaitLsnWaiter::Tenant
|
||||
| WaitLsnWaiter::PageService
|
||||
| WaitLsnWaiter::HttpEndpoint => unreachable!(
|
||||
| WaitLsnWaiter::HttpEndpoint
|
||||
| WaitLsnWaiter::BaseBackupCache => unreachable!(
|
||||
"tenant or page_service context are not expected to have task kind {:?}",
|
||||
ctx.task_kind()
|
||||
),
|
||||
@@ -2460,6 +2466,41 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_basebackup_cache_enabled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.basebackup_cache_enabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled)
|
||||
}
|
||||
|
||||
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
|
||||
/// The method is asynchronous and returns immediately.
|
||||
/// The actual basebackup preparation is performed in the background
|
||||
/// by the basebackup cache on a best-effort basis.
|
||||
pub(crate) fn prepare_basebackup(&self, lsn: Lsn) {
|
||||
if !self.is_basebackup_cache_enabled() {
|
||||
return;
|
||||
}
|
||||
if !self.tenant_shard_id.is_shard_zero() {
|
||||
// In theory we should never get here, but just in case check it.
|
||||
// Preparing basebackup doesn't make sense for shards other than shard zero.
|
||||
return;
|
||||
}
|
||||
|
||||
let res = self
|
||||
.basebackup_prepare_sender
|
||||
.send(BasebackupPrepareRequest {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
timeline_id: self.timeline_id,
|
||||
lsn,
|
||||
});
|
||||
if let Err(e) = res {
|
||||
// May happen during shutdown, it's not critical.
|
||||
info!("Failed to send shutdown checkpoint: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of times we will compute partition within a checkpoint distance.
|
||||
@@ -2537,6 +2578,13 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub(crate) fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = &self.tenant_conf.load().tenant_conf;
|
||||
tenant_conf
|
||||
.pitr_interval
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
fn get_compaction_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -3022,6 +3070,8 @@ impl Timeline {
|
||||
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
|
||||
|
||||
basebackup_prepare_sender: resources.basebackup_prepare_sender,
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -6235,14 +6285,12 @@ impl Timeline {
|
||||
|
||||
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
|
||||
|
||||
if cfg!(test) {
|
||||
if cfg!(test) && pitr == Duration::ZERO {
|
||||
// Unit tests which specify zero PITR interval expect to avoid doing any I/O for timestamp lookup
|
||||
if pitr == Duration::ZERO {
|
||||
return Ok(GcCutoffs {
|
||||
time: self.get_last_record_lsn(),
|
||||
space: space_cutoff,
|
||||
});
|
||||
}
|
||||
return Ok(GcCutoffs {
|
||||
time: Some(self.get_last_record_lsn()),
|
||||
space: space_cutoff,
|
||||
});
|
||||
}
|
||||
|
||||
// Calculate a time-based limit on how much to retain:
|
||||
@@ -6256,14 +6304,14 @@ impl Timeline {
|
||||
// PITR is not set. Retain the size-based limit, or the default time retention,
|
||||
// whichever requires less data.
|
||||
GcCutoffs {
|
||||
time: self.get_last_record_lsn(),
|
||||
time: Some(self.get_last_record_lsn()),
|
||||
space: std::cmp::max(time_cutoff, space_cutoff),
|
||||
}
|
||||
}
|
||||
(Duration::ZERO, None) => {
|
||||
// PITR is not set, and time lookup failed
|
||||
GcCutoffs {
|
||||
time: self.get_last_record_lsn(),
|
||||
time: Some(self.get_last_record_lsn()),
|
||||
space: space_cutoff,
|
||||
}
|
||||
}
|
||||
@@ -6271,7 +6319,7 @@ impl Timeline {
|
||||
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
|
||||
// cannot advance beyond what was already GC'd, and respect space-based retention
|
||||
GcCutoffs {
|
||||
time: *self.get_applied_gc_cutoff_lsn(),
|
||||
time: Some(*self.get_applied_gc_cutoff_lsn()),
|
||||
space: space_cutoff,
|
||||
}
|
||||
}
|
||||
@@ -6279,7 +6327,7 @@ impl Timeline {
|
||||
// PITR interval is set and we looked up timestamp successfully. Ignore
|
||||
// size based retention and make time cutoff authoritative
|
||||
GcCutoffs {
|
||||
time: time_cutoff,
|
||||
time: Some(time_cutoff),
|
||||
space: time_cutoff,
|
||||
}
|
||||
}
|
||||
@@ -6332,7 +6380,7 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
let mut new_gc_cutoff = Lsn::min(space_cutoff, time_cutoff);
|
||||
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
|
||||
let standby_horizon = self.standby_horizon.load();
|
||||
// Hold GC for the standby, but as a safety guard do it only within some
|
||||
// reasonable lag.
|
||||
@@ -6381,7 +6429,7 @@ impl Timeline {
|
||||
async fn gc_timeline(
|
||||
&self,
|
||||
space_cutoff: Lsn,
|
||||
time_cutoff: Lsn,
|
||||
time_cutoff: Option<Lsn>, // None if uninitialized
|
||||
retain_lsns: Vec<Lsn>,
|
||||
max_lsn_with_valid_lease: Option<Lsn>,
|
||||
new_gc_cutoff: Lsn,
|
||||
@@ -6400,6 +6448,12 @@ impl Timeline {
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
let Some(time_cutoff) = time_cutoff else {
|
||||
// The GC cutoff should have been computed by now, but let's be defensive.
|
||||
info!("Nothing to GC: time_cutoff not yet computed");
|
||||
return Ok(result);
|
||||
};
|
||||
|
||||
// We need to ensure that no one tries to read page versions or create
|
||||
// branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
|
||||
// for details. This will block until the old value is no longer in use.
|
||||
|
||||
@@ -1526,7 +1526,7 @@ impl Timeline {
|
||||
info!(
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
|
||||
checked {layers_checked}/{layers_total} layers \
|
||||
(latest_gc_cutoff={} pitr_cutoff={})",
|
||||
(latest_gc_cutoff={} pitr_cutoff={:?})",
|
||||
layers_to_rewrite.len(),
|
||||
drop_layers.len(),
|
||||
*latest_gc_cutoff,
|
||||
|
||||
@@ -1316,6 +1316,10 @@ impl WalIngest {
|
||||
}
|
||||
});
|
||||
|
||||
if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
|
||||
modification.tline.prepare_basebackup(lsn);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
17
poetry.lock
generated
17
poetry.lock
generated
@@ -3170,19 +3170,24 @@ pbr = "*"
|
||||
|
||||
[[package]]
|
||||
name = "setuptools"
|
||||
version = "70.0.0"
|
||||
version = "78.1.1"
|
||||
description = "Easily download, build, install, upgrade, and uninstall Python packages"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
|
||||
{file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
|
||||
{file = "setuptools-78.1.1-py3-none-any.whl", hash = "sha256:c3a9c4211ff4c309edb8b8c4f1cbfa7ae324c4ba9f91ff254e3d305b9fd54561"},
|
||||
{file = "setuptools-78.1.1.tar.gz", hash = "sha256:fcc17fd9cd898242f6b4adfaca46137a9edef687f43e6f78469692a5e70d851d"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
|
||||
testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov ; platform_python_implementation != \"PyPy\"", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
|
||||
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""]
|
||||
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
|
||||
cover = ["pytest-cov"]
|
||||
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
|
||||
enabler = ["pytest-enabler (>=2.2)"]
|
||||
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
|
||||
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"]
|
||||
|
||||
[[package]]
|
||||
name = "six"
|
||||
|
||||
@@ -127,3 +127,4 @@ rstest.workspace = true
|
||||
walkdir.workspace = true
|
||||
rand_distr = "0.4"
|
||||
tokio-postgres.workspace = true
|
||||
tracing-test = "0.2"
|
||||
@@ -80,10 +80,22 @@ impl std::fmt::Display for Backend<'_, ()> {
|
||||
.field(&endpoint.url())
|
||||
.finish(),
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ControlPlaneClient::PostgresMock(endpoint) => fmt
|
||||
.debug_tuple("ControlPlane::PostgresMock")
|
||||
.field(&endpoint.url())
|
||||
.finish(),
|
||||
ControlPlaneClient::PostgresMock(endpoint) => {
|
||||
let url = endpoint.url();
|
||||
match url::Url::parse(url) {
|
||||
Ok(mut url) => {
|
||||
let _ = url.set_password(Some("_redacted_"));
|
||||
let url = url.as_str();
|
||||
fmt.debug_tuple("ControlPlane::PostgresMock")
|
||||
.field(&url)
|
||||
.finish()
|
||||
}
|
||||
Err(_) => fmt
|
||||
.debug_tuple("ControlPlane::PostgresMock")
|
||||
.field(&url)
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
ControlPlaneClient::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
|
||||
},
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use anyhow::Context;
|
||||
use anyhow::{bail, ensure};
|
||||
use arc_swap::ArcSwapOption;
|
||||
use futures::future::Either;
|
||||
@@ -23,6 +27,7 @@ use crate::config::{
|
||||
ProxyConfig, ProxyProtocolV2, remote_storage_from_toml,
|
||||
};
|
||||
use crate::context::parquet::ParquetUploadArgs;
|
||||
use crate::control_plane::client::cplane_proxy_v1::{GeoProximity, RegionProximityMap};
|
||||
use crate::http::health_server::AppMetrics;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::rate_limiter::{
|
||||
@@ -35,6 +40,8 @@ use crate::scram::threadpool::ThreadPool;
|
||||
use crate::serverless::GlobalConnPoolOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::tls::client_config::compute_client_config_with_root_certs;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use crate::url::ApiUrl;
|
||||
use crate::{auth, control_plane, http, serverless, usage_metrics};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
@@ -760,12 +767,21 @@ fn build_auth_backend(
|
||||
let wake_compute_endpoint_rate_limiter =
|
||||
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
|
||||
|
||||
let geo_map = Box::leak(Box::new(RegionProximityMap::from([(
|
||||
args.region.clone(),
|
||||
GeoProximity {
|
||||
_weight: 1,
|
||||
_distance: 0,
|
||||
},
|
||||
)])));
|
||||
|
||||
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
|
||||
endpoint,
|
||||
args.control_plane_token.clone(),
|
||||
caches,
|
||||
locks,
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
geo_map,
|
||||
);
|
||||
|
||||
let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
|
||||
@@ -777,7 +793,13 @@ fn build_auth_backend(
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
AuthBackendType::Postgres => {
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let mut url: ApiUrl = args.auth_endpoint.parse()?;
|
||||
if url.password().is_none() {
|
||||
let password = env::var("PGPASSWORD")
|
||||
.with_context(|| "auth-endpoint does not contain a password and environment variable `PGPASSWORD` is not set")?;
|
||||
url.set_password(Some(&password))
|
||||
.expect("Failed to set password");
|
||||
}
|
||||
let api = control_plane::client::mock::MockControlPlane::new(
|
||||
url,
|
||||
!args.is_private_access_proxy,
|
||||
@@ -833,6 +855,14 @@ fn build_auth_backend(
|
||||
let wake_compute_endpoint_rate_limiter =
|
||||
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
|
||||
|
||||
let geo_map = Box::leak(Box::new(RegionProximityMap::from([(
|
||||
args.region.clone(),
|
||||
GeoProximity {
|
||||
_weight: 1,
|
||||
_distance: 0,
|
||||
},
|
||||
)])));
|
||||
|
||||
// Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter
|
||||
// and locks are not used in ConsoleRedirectBackend,
|
||||
// but they are required by the NeonControlPlaneClient
|
||||
@@ -842,6 +872,7 @@ fn build_auth_backend(
|
||||
caches,
|
||||
locks,
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
geo_map,
|
||||
);
|
||||
|
||||
let backend = ConsoleRedirectBackend::new(url, api);
|
||||
|
||||
@@ -296,6 +296,10 @@ impl RequestContext {
|
||||
.has_private_peer_addr()
|
||||
}
|
||||
|
||||
pub fn is_global(&self) -> bool {
|
||||
self.0.try_lock().expect("should not deadlock").region == "global"
|
||||
}
|
||||
|
||||
pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
|
||||
let mut this = self.0.try_lock().expect("should not deadlock");
|
||||
// Do not record errors from the private address to metrics.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Production console backend.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -12,7 +13,10 @@ use postgres_client::config::SslMode;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{Instrument, debug, info, info_span, warn};
|
||||
|
||||
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
|
||||
use super::super::messages::{
|
||||
ControlPlaneErrorMessage, GetEndpointAccessControl, GetEndpointAccessControlReplicated,
|
||||
WakeCompute,
|
||||
};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::cache::Cached;
|
||||
@@ -34,6 +38,15 @@ use crate::{compute, http, scram};
|
||||
|
||||
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
|
||||
|
||||
pub type RegionId = String;
|
||||
pub type RegionProximityMap = HashMap<RegionId, GeoProximity>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GeoProximity {
|
||||
pub _weight: u64, // load or preference-based parameter
|
||||
pub _distance: u64, // approximate distance from the region to the current proxy
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NeonControlPlaneClient {
|
||||
endpoint: http::Endpoint,
|
||||
@@ -42,6 +55,7 @@ pub struct NeonControlPlaneClient {
|
||||
pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
|
||||
// put in a shared ref so we don't copy secrets all over in memory
|
||||
jwt: Arc<str>,
|
||||
geo_map: &'static RegionProximityMap,
|
||||
}
|
||||
|
||||
impl NeonControlPlaneClient {
|
||||
@@ -52,6 +66,7 @@ impl NeonControlPlaneClient {
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
|
||||
geo_map: &'static RegionProximityMap,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint,
|
||||
@@ -59,6 +74,7 @@ impl NeonControlPlaneClient {
|
||||
locks,
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
jwt,
|
||||
geo_map,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,10 +97,126 @@ impl NeonControlPlaneClient {
|
||||
info!("endpoint is not valid, skipping the request");
|
||||
return Ok(AuthInfo::default());
|
||||
}
|
||||
|
||||
if ctx.is_global() {
|
||||
return self
|
||||
.do_get_auth_req_replicated(user_info, &ctx.session_id(), Some(ctx))
|
||||
.await;
|
||||
}
|
||||
|
||||
self.do_get_auth_req(user_info, &ctx.session_id(), Some(ctx))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_get_auth_req_replicated(
|
||||
&self,
|
||||
user_info: &ComputeUserInfo,
|
||||
session_id: &uuid::Uuid,
|
||||
ctx: Option<&RequestContext>,
|
||||
) -> Result<AuthInfo, GetAuthInfoError> {
|
||||
let request_id: String = session_id.to_string();
|
||||
let application_name = if let Some(ctx) = ctx {
|
||||
ctx.console_application_name()
|
||||
} else {
|
||||
"auth_cancellation".to_string()
|
||||
};
|
||||
|
||||
async {
|
||||
let request = self
|
||||
.endpoint
|
||||
.get_path("get_endpoint_access_control_replicated")
|
||||
.header(X_REQUEST_ID, &request_id)
|
||||
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", session_id)])
|
||||
.query(&[
|
||||
("application_name", application_name.as_str()),
|
||||
("endpointish", user_info.endpoint.as_str()),
|
||||
("role", user_info.user.as_str()),
|
||||
])
|
||||
.build()?;
|
||||
|
||||
debug!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let response = match ctx {
|
||||
Some(ctx) => {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
|
||||
let rsp = self.endpoint.execute(request).await;
|
||||
drop(pause);
|
||||
rsp?
|
||||
}
|
||||
None => self.endpoint.execute(request).await?,
|
||||
};
|
||||
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = match parse_body::<GetEndpointAccessControlReplicated>(response).await {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
return if e.get_reason().is_not_found() {
|
||||
// TODO: refactor this because it's weird
|
||||
// this is a failure to authenticate but we return Ok.
|
||||
Ok(AuthInfo::default())
|
||||
} else {
|
||||
Err(e.into())
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
for endpoint in body.endpoints {
|
||||
if let Some(region_id) = &endpoint.region_id {
|
||||
if let Some(_proximity) = self.geo_map.get(region_id) {
|
||||
// TODO:: calculate proximity and reroute
|
||||
|
||||
let secret = if endpoint.role_secret.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let secret = scram::ServerSecret::parse(&endpoint.role_secret)
|
||||
.map(AuthSecret::Scram)
|
||||
.ok_or(GetAuthInfoError::BadSecret)?;
|
||||
Some(secret)
|
||||
};
|
||||
let allowed_ips = endpoint.allowed_ips.unwrap_or_default();
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.allowed_ips_number
|
||||
.observe(allowed_ips.len() as f64);
|
||||
let allowed_vpc_endpoint_ids =
|
||||
endpoint.allowed_vpc_endpoint_ids.unwrap_or_default();
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.allowed_vpc_endpoint_ids
|
||||
.observe(allowed_vpc_endpoint_ids.len() as f64);
|
||||
let block_public_connections =
|
||||
endpoint.block_public_connections.unwrap_or_default();
|
||||
let block_vpc_connections =
|
||||
endpoint.block_vpc_connections.unwrap_or_default();
|
||||
|
||||
// return the closest replica
|
||||
return Ok(AuthInfo {
|
||||
secret,
|
||||
allowed_ips,
|
||||
allowed_vpc_endpoint_ids,
|
||||
project_id: endpoint.project_id,
|
||||
account_id: endpoint.account_id,
|
||||
access_blocker_flags: AccessBlockerFlags {
|
||||
public_access_blocked: block_public_connections,
|
||||
vpc_access_blocked: block_vpc_connections,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return Err(GetAuthInfoError::RegionNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
Err(GetAuthInfoError::RegionNotFound)
|
||||
}
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
.instrument(info_span!("do_get_auth_info"))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_get_auth_req(
|
||||
&self,
|
||||
user_info: &ComputeUserInfo,
|
||||
|
||||
@@ -99,6 +99,9 @@ pub(crate) enum GetAuthInfoError {
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ControlPlaneError),
|
||||
|
||||
#[error("No endpoint found in this region")]
|
||||
RegionNotFound,
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
@@ -115,6 +118,7 @@ impl UserFacingError for GetAuthInfoError {
|
||||
Self::BadSecret => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
Self::ApiError(e) => e.to_string_client(),
|
||||
Self::RegionNotFound => "No endpoint found in this region".to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -124,6 +128,7 @@ impl ReportableError for GetAuthInfoError {
|
||||
match self {
|
||||
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
|
||||
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
|
||||
Self::RegionNotFound => crate::error::ErrorKind::User,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,6 +222,25 @@ pub(crate) struct UserFacingMessage {
|
||||
pub(crate) message: Box<str>,
|
||||
}
|
||||
|
||||
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
|
||||
/// Returned by the `/get_endpoint_access_control_replicated` API method.
|
||||
#[derive(Deserialize)]
|
||||
pub(crate) struct GetEndpointAccessControlReplicated {
|
||||
pub(crate) endpoints: Vec<EndpointAccessControlReplicated>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub(crate) struct EndpointAccessControlReplicated {
|
||||
pub(crate) role_secret: Box<str>,
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
pub(crate) account_id: Option<AccountIdInt>,
|
||||
pub(crate) block_public_connections: Option<bool>,
|
||||
pub(crate) block_vpc_connections: Option<bool>,
|
||||
pub(crate) region_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
|
||||
/// Returned by the `/get_endpoint_access_control` API method.
|
||||
#[derive(Deserialize)]
|
||||
|
||||
@@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
|
||||
use postgres_client::error::SqlState;
|
||||
// Here are errors that happens after the user successfully authenticated to the database.
|
||||
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
|
||||
!matches!(
|
||||
let non_retriable_pg_errors = matches!(
|
||||
self.code(),
|
||||
&SqlState::TOO_MANY_CONNECTIONS
|
||||
| &SqlState::OUT_OF_MEMORY
|
||||
@@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
|
||||
| &SqlState::T_R_SERIALIZATION_FAILURE
|
||||
| &SqlState::INVALID_CATALOG_NAME
|
||||
| &SqlState::INVALID_SCHEMA_NAME
|
||||
| &SqlState::INVALID_PARAMETER_VALUE
|
||||
)
|
||||
| &SqlState::INVALID_PARAMETER_VALUE,
|
||||
);
|
||||
if non_retriable_pg_errors {
|
||||
return false;
|
||||
}
|
||||
// PGBouncer errors that should not trigger a wake_compute retry.
|
||||
if self.code() == &SqlState::PROTOCOL_VIOLATION {
|
||||
// Source for the error message:
|
||||
// https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070
|
||||
return !self
|
||||
.message()
|
||||
.contains("no more connections allowed (max_client_conn)");
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati
|
||||
.base_delay
|
||||
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ShouldRetryWakeCompute;
|
||||
use postgres_client::error::{DbError, SqlState};
|
||||
|
||||
#[test]
|
||||
fn should_retry_wake_compute_for_db_error() {
|
||||
// These SQLStates should NOT trigger a wake_compute retry.
|
||||
let non_retry_states = [
|
||||
SqlState::TOO_MANY_CONNECTIONS,
|
||||
SqlState::OUT_OF_MEMORY,
|
||||
SqlState::SYNTAX_ERROR,
|
||||
SqlState::T_R_SERIALIZATION_FAILURE,
|
||||
SqlState::INVALID_CATALOG_NAME,
|
||||
SqlState::INVALID_SCHEMA_NAME,
|
||||
SqlState::INVALID_PARAMETER_VALUE,
|
||||
];
|
||||
for state in non_retry_states {
|
||||
let err = DbError::new_test_error(state.clone(), "oops".to_string());
|
||||
assert!(
|
||||
!err.should_retry_wake_compute(),
|
||||
"State {state:?} unexpectedly retried"
|
||||
);
|
||||
}
|
||||
|
||||
// Errors coming from pgbouncer should not trigger a wake_compute retry
|
||||
let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"];
|
||||
for error in non_retry_pgbouncer_errors {
|
||||
let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string());
|
||||
assert!(
|
||||
!err.should_retry_wake_compute(),
|
||||
"PGBouncer error {error:?} unexpectedly retried"
|
||||
);
|
||||
}
|
||||
|
||||
// These SQLStates should trigger a wake_compute retry.
|
||||
let retry_states = [
|
||||
SqlState::CONNECTION_FAILURE,
|
||||
SqlState::CONNECTION_EXCEPTION,
|
||||
SqlState::CONNECTION_DOES_NOT_EXIST,
|
||||
SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
];
|
||||
for state in retry_states {
|
||||
let err = DbError::new_test_error(state.clone(), "oops".to_string());
|
||||
assert!(
|
||||
err.should_retry_wake_compute(),
|
||||
"State {state:?} unexpectedly skipped retry"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use rstest::rstest;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::pki_types;
|
||||
use tokio::io::DuplexStream;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::CouldRetry;
|
||||
@@ -381,8 +382,14 @@ enum ConnectAction {
|
||||
WakeFail,
|
||||
WakeRetry,
|
||||
Connect,
|
||||
// connect_once -> Err, could_retry = true, should_retry_wake_compute = true
|
||||
Retry,
|
||||
// connect_once -> Err, could_retry = true, should_retry_wake_compute = false
|
||||
RetryNoWake,
|
||||
// connect_once -> Err, could_retry = false, should_retry_wake_compute = true
|
||||
Fail,
|
||||
// connect_once -> Err, could_retry = false, should_retry_wake_compute = false
|
||||
FailNoWake,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -424,6 +431,7 @@ struct TestConnection;
|
||||
#[derive(Debug)]
|
||||
struct TestConnectError {
|
||||
retryable: bool,
|
||||
wakeable: bool,
|
||||
kind: crate::error::ErrorKind,
|
||||
}
|
||||
|
||||
@@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError {
|
||||
}
|
||||
impl ShouldRetryWakeCompute for TestConnectError {
|
||||
fn should_retry_wake_compute(&self) -> bool {
|
||||
true
|
||||
self.wakeable
|
||||
}
|
||||
}
|
||||
|
||||
@@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
ConnectAction::Connect => Ok(TestConnection),
|
||||
ConnectAction::Retry => Err(TestConnectError {
|
||||
retryable: true,
|
||||
wakeable: true,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
ConnectAction::RetryNoWake => Err(TestConnectError {
|
||||
retryable: true,
|
||||
wakeable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
ConnectAction::Fail => Err(TestConnectError {
|
||||
retryable: false,
|
||||
wakeable: true,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
ConnectAction::FailNoWake => Err(TestConnectError {
|
||||
retryable: false,
|
||||
wakeable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
x => panic!("expecting action {x:?}, connect is called instead"),
|
||||
@@ -709,3 +729,92 @@ async fn wake_non_retry() {
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn fail_but_wake_invalidates_cache() {
|
||||
let ctx = RequestContext::test();
|
||||
let mech = TestConnectMechanism::new(vec![
|
||||
ConnectAction::Wake,
|
||||
ConnectAction::Fail,
|
||||
ConnectAction::Wake,
|
||||
ConnectAction::Connect,
|
||||
]);
|
||||
let user = helper_create_connect_info(&mech);
|
||||
let cfg = config();
|
||||
|
||||
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(logs_contain(
|
||||
"invalidating stalled compute node info cache entry"
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn fail_no_wake_skips_cache_invalidation() {
|
||||
let ctx = RequestContext::test();
|
||||
let mech = TestConnectMechanism::new(vec![
|
||||
ConnectAction::Wake,
|
||||
ConnectAction::FailNoWake,
|
||||
ConnectAction::Connect,
|
||||
]);
|
||||
let user = helper_create_connect_info(&mech);
|
||||
let cfg = config();
|
||||
|
||||
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!logs_contain(
|
||||
"invalidating stalled compute node info cache entry"
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn retry_but_wake_invalidates_cache() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
// Wake → Retry (retryable + wakeable) → Wake → Connect
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let cfg = config();
|
||||
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
|
||||
// Because Retry has wakeable=true, we should see invalidate_cache
|
||||
assert!(logs_contain(
|
||||
"invalidating stalled compute node info cache entry"
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn retry_no_wake_skips_invalidation() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
// Wake → RetryNoWake (retryable + NOT wakeable)
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
let cfg = config();
|
||||
|
||||
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
|
||||
// Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache
|
||||
assert!(!logs_contain(
|
||||
"invalidating stalled compute node info cache entry"
|
||||
));
|
||||
}
|
||||
|
||||
@@ -43,6 +43,12 @@ impl std::ops::Deref for ApiUrl {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for ApiUrl {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ApiUrl {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
|
||||
@@ -184,6 +184,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
"pageserver_aux_file_estimated_size",
|
||||
"pageserver_valid_lsn_lease_count",
|
||||
"pageserver_tenant_offloaded_timelines",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
|
||||
77
test_runner/regress/test_basebackup.py
Normal file
77
test_runner/regress/test_basebackup.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Simple test for basebackup cache.
|
||||
1. Check that we always hit the cache after compute restart.
|
||||
2. Check that we eventually delete old basebackup files, but not the latest one.
|
||||
3. Check that we delete basebackup file for timeline with active compute.
|
||||
"""
|
||||
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
tenant_config = { basebackup_cache_enabled = true }
|
||||
basebackup_cache_config = { cleanup_period = '1s' }
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ep = env.endpoints.create("main")
|
||||
ps = env.pageserver
|
||||
ps_http = ps.http_client()
|
||||
|
||||
# 1. Check that we always hit the cache after compute restart.
|
||||
for i in range(3):
|
||||
ep.start()
|
||||
ep.stop()
|
||||
|
||||
def check_metrics(i=i):
|
||||
metrics = ps_http.get_metrics()
|
||||
# Never miss.
|
||||
# The first time compute_ctl sends `get_basebackup` with lsn=None, we do not cache such requests.
|
||||
# All other requests should be a hit
|
||||
assert (
|
||||
metrics.query_one(
|
||||
"pageserver_basebackup_cache_read_total", {"result": "miss"}
|
||||
).value
|
||||
== 0
|
||||
)
|
||||
# All but the first requests are hits.
|
||||
assert (
|
||||
metrics.query_one("pageserver_basebackup_cache_read_total", {"result": "hit"}).value
|
||||
== i
|
||||
)
|
||||
# Every compute shut down should trigger a prepare reuest.
|
||||
assert (
|
||||
metrics.query_one(
|
||||
"pageserver_basebackup_cache_prepare_total", {"result": "ok"}
|
||||
).value
|
||||
== i + 1
|
||||
)
|
||||
|
||||
wait_until(check_metrics)
|
||||
|
||||
# 2. Check that we eventually delete old basebackup files, but not the latest one.
|
||||
def check_bb_file_count():
|
||||
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
|
||||
# tmp dir + 1 basebackup file.
|
||||
assert len(bb_files) == 2
|
||||
|
||||
wait_until(check_bb_file_count)
|
||||
|
||||
# 3. Check that we delete basebackup file for timeline with active compute.
|
||||
ep.start()
|
||||
ep.safe_psql("create table t1 as select generate_series(1, 10) as n")
|
||||
|
||||
def check_bb_dir_empty():
|
||||
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
|
||||
# only tmp dir.
|
||||
assert len(bb_files) == 1
|
||||
|
||||
wait_until(check_bb_dir_empty)
|
||||
@@ -508,6 +508,9 @@ PER_METRIC_VERIFIERS = {
|
||||
"remote_storage_size": CannotVerifyAnything,
|
||||
"written_size": WrittenDataVerifier,
|
||||
"written_data_bytes_delta": WrittenDataDeltaVerifier,
|
||||
"written_size_since_parent": WrittenDataVerifier, # same as written_size on root
|
||||
"pitr_cutoff": CannotVerifyAnything,
|
||||
"pitr_history_size_since_parent": WrittenDataVerifier, # same as written_size on root w/o GC
|
||||
"timeline_logical_size": CannotVerifyAnything,
|
||||
"synthetic_storage_size": SyntheticSizeVerifier,
|
||||
}
|
||||
|
||||
@@ -193,6 +193,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
|
||||
"test_ancestor_branch_archive_branch1", tenant_id, "test_ancestor_branch_archive_parent"
|
||||
)
|
||||
|
||||
offloaded_count = ps_http.get_metric_value(
|
||||
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
|
||||
)
|
||||
assert offloaded_count == 0
|
||||
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
leaf_timeline_id,
|
||||
@@ -244,6 +249,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
|
||||
wait_until(leaf_offloaded)
|
||||
wait_until(parent_offloaded)
|
||||
|
||||
offloaded_count = ps_http.get_metric_value(
|
||||
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
|
||||
)
|
||||
assert offloaded_count == 2
|
||||
|
||||
# Offloaded child timelines should still prevent deletion
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
|
||||
@@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tracing-log = { version = "0.2" }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4", "v7"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
|
||||
Reference in New Issue
Block a user