mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
4 Commits
release-79
...
erik/jemal
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e5053eaff | ||
|
|
ff3819efc7 | ||
|
|
f927ae6e15 | ||
|
|
61d385caea |
28
Cargo.lock
generated
28
Cargo.lock
generated
@@ -1874,12 +1874,6 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "difflib"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3269,8 +3263,7 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
|
||||
[[package]]
|
||||
name = "jemalloc_pprof"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a883828bd6a4b957cd9f618886ff19e5f3ebd34e06ba0e855849e049fef32fb"
|
||||
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
@@ -3337,17 +3330,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json-structural-diff"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
|
||||
dependencies = [
|
||||
"difflib",
|
||||
"regex",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "9.2.0"
|
||||
@@ -3470,8 +3452,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "mappings"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce9229c438fbf1c333926e2053c4c091feabbd40a1b590ec62710fea2384af9e"
|
||||
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
@@ -4746,10 +4727,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "pprof_util"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "65c568b3f8c1c37886ae07459b1946249e725c315306b03be5632f84c239f781"
|
||||
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"flate2",
|
||||
"num",
|
||||
"paste",
|
||||
@@ -6460,7 +6441,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
|
||||
@@ -116,7 +116,7 @@ inferno = "0.12.0"
|
||||
ipnet = "2.10.0"
|
||||
itertools = "0.10"
|
||||
itoa = "1.0.11"
|
||||
jemalloc_pprof = "0.6"
|
||||
jemalloc_pprof = { git = "https://github.com/erikgrinaker/rust-jemalloc-pprof", branch = "symbolize", version = "0.6", features = ["symbolize"] }
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
libc = "0.2"
|
||||
@@ -210,7 +210,6 @@ rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -395,15 +395,22 @@ RUN case "${PG_VERSION:?}" in \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
|
||||
FROM pg-build AS plv8-build
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
# (The V8 engine takes a very long time to build)
|
||||
FROM build-deps AS plv8-build
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build python3-dev libncurses5 binutils clang \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=plv8-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) v8
|
||||
|
||||
# Step 2: Build the PostgreSQL-dependent parts
|
||||
COPY --from=pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
ENV PATH="/usr/local/pgsql/bin:$PATH"
|
||||
RUN \
|
||||
# generate and copy upgrade scripts
|
||||
make generate_upgrades && \
|
||||
|
||||
@@ -335,21 +335,13 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings
|
||||
.remove("checkpoint_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_timeout' as duration")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings
|
||||
.remove("compaction_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_period' as duration")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -395,10 +387,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_period' as duration")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -414,20 +403,13 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'pitr_interval' as duration")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
@@ -445,14 +427,8 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
|
||||
heatmap_period: settings
|
||||
.remove("heatmap_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'heatmap_period' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
@@ -463,15 +439,10 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length' as duration")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
|
||||
lsn_lease_length_for_ts: settings
|
||||
.remove("lsn_lease_length_for_ts")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
timeline_offloading: settings
|
||||
.remove("timeline_offloading")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
threshold: threshold.into(),
|
||||
},
|
||||
)),
|
||||
heatmap_period: Some(Duration::from_secs(300)),
|
||||
heatmap_period: Some("300s".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
|
||||
@@ -495,19 +495,10 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
Format::Pprof => {
|
||||
let data = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
// Symbolize the profile.
|
||||
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
|
||||
// serialization roundtrip.
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
pprof::encode(&profile)
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof())
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
@@ -520,8 +511,6 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
let body = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
let mut opts = inferno::flamegraph::Options::default();
|
||||
opts.title = "Heap inuse".to_string();
|
||||
opts.count_name = "bytes".to_string();
|
||||
|
||||
@@ -526,13 +526,9 @@ pub struct TenantConfigPatch {
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
pub compaction_upper_limit: Option<usize>,
|
||||
// defer parsing compaction_algorithm, like eviction_policy
|
||||
@@ -543,38 +539,22 @@ pub struct TenantConfig {
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Option<Duration>,
|
||||
pub gc_period: Option<String>,
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub pitr_interval: Option<String>,
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub image_creation_preempt_threshold: Option<usize>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Option<Duration>,
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
@@ -584,10 +564,7 @@ pub struct TenantConfig {
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn apply_patch(
|
||||
self,
|
||||
patch: TenantConfigPatch,
|
||||
) -> Result<TenantConfig, humantime::DurationError> {
|
||||
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
|
||||
let Self {
|
||||
mut checkpoint_distance,
|
||||
mut checkpoint_timeout,
|
||||
@@ -627,17 +604,11 @@ impl TenantConfig {
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
patch
|
||||
.checkpoint_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut checkpoint_timeout);
|
||||
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.compaction_target_size
|
||||
.apply(&mut compaction_target_size);
|
||||
patch
|
||||
.compaction_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut compaction_period);
|
||||
patch.compaction_period.apply(&mut compaction_period);
|
||||
patch.compaction_threshold.apply(&mut compaction_threshold);
|
||||
patch
|
||||
.compaction_upper_limit
|
||||
@@ -655,25 +626,15 @@ impl TenantConfig {
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch
|
||||
.gc_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut gc_period);
|
||||
patch.gc_period.apply(&mut gc_period);
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut pitr_interval);
|
||||
patch.pitr_interval.apply(&mut pitr_interval);
|
||||
patch
|
||||
.walreceiver_connect_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut walreceiver_connect_timeout);
|
||||
patch
|
||||
.lagging_wal_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lagging_wal_timeout);
|
||||
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
|
||||
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
|
||||
patch.eviction_policy.apply(&mut eviction_policy);
|
||||
patch
|
||||
@@ -681,12 +642,8 @@ impl TenantConfig {
|
||||
.apply(&mut min_resident_size_override);
|
||||
patch
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut evictions_low_residence_duration_metric_threshold);
|
||||
patch
|
||||
.heatmap_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut heatmap_period);
|
||||
patch.heatmap_period.apply(&mut heatmap_period);
|
||||
patch.lazy_slru_download.apply(&mut lazy_slru_download);
|
||||
patch
|
||||
.timeline_get_throttle
|
||||
@@ -697,13 +654,9 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_preempt_threshold
|
||||
.apply(&mut image_creation_preempt_threshold);
|
||||
patch
|
||||
.lsn_lease_length
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length);
|
||||
patch.lsn_lease_length.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length_for_ts
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length_for_ts);
|
||||
patch.timeline_offloading.apply(&mut timeline_offloading);
|
||||
patch
|
||||
@@ -720,7 +673,7 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
checkpoint_distance,
|
||||
checkpoint_timeout,
|
||||
compaction_target_size,
|
||||
@@ -756,7 +709,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2550,7 +2503,7 @@ mod tests {
|
||||
..base.clone()
|
||||
};
|
||||
|
||||
let patched = base.apply_patch(decoded.config).unwrap();
|
||||
let patched = base.apply_patch(decoded.config);
|
||||
|
||||
assert_eq!(patched, expected);
|
||||
}
|
||||
|
||||
@@ -10,14 +10,14 @@ cargo bench --package utils
|
||||
cargo bench --package utils --bench benchmarks
|
||||
|
||||
# Specific benchmark.
|
||||
cargo bench --package utils --bench benchmarks log_slow/enabled=true
|
||||
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package utils --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package utils --bench benchmarks log_slow/enabled=true --profile-time 10
|
||||
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
@@ -3,14 +3,14 @@ use std::time::Duration;
|
||||
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use utils::id;
|
||||
use utils::logging::log_slow;
|
||||
use utils::logging::warn_slow;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_id_stringify,
|
||||
bench_log_slow,
|
||||
bench_warn_slow,
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -29,9 +29,9 @@ pub fn bench_id_stringify(c: &mut Criterion) {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn bench_log_slow(c: &mut Criterion) {
|
||||
pub fn bench_warn_slow(c: &mut Criterion) {
|
||||
for enabled in [false, true] {
|
||||
c.bench_function(&format!("log_slow/enabled={enabled}"), |b| {
|
||||
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
|
||||
run_bench(b, enabled).unwrap()
|
||||
});
|
||||
}
|
||||
@@ -45,11 +45,11 @@ pub fn bench_log_slow(c: &mut Criterion) {
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
// Test both with and without log_slow, since we're essentially measuring Tokio scheduling
|
||||
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
|
||||
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
|
||||
// paths for a ready future.
|
||||
if enabled {
|
||||
b.iter(|| runtime.block_on(log_slow("ready", THRESHOLD, tokio::task::yield_now())));
|
||||
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
|
||||
} else {
|
||||
b.iter(|| runtime.block_on(tokio::task::yield_now()));
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
/// Logs a critical error, similarly to `tracing::error!`. This will:
|
||||
///
|
||||
@@ -322,13 +322,11 @@ impl std::fmt::Debug for SecretString {
|
||||
}
|
||||
}
|
||||
|
||||
/// Logs a periodic message if a future is slow to complete.
|
||||
/// Logs a periodic warning if a future is slow to complete.
|
||||
///
|
||||
/// This is performance-sensitive as it's used on the GetPage read path.
|
||||
///
|
||||
/// TODO: consider upgrading this to a warning, but currently it fires too often.
|
||||
#[inline]
|
||||
pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
|
||||
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
|
||||
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
|
||||
// won't fit on the stack.
|
||||
let mut f = Box::pin(f);
|
||||
@@ -347,13 +345,13 @@ pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output
|
||||
// false negatives.
|
||||
let elapsed = started.elapsed();
|
||||
if elapsed >= threshold {
|
||||
info!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
|
||||
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
let elapsed = started.elapsed().as_secs_f64();
|
||||
info!("slow {name} still running after {elapsed:.3}s",);
|
||||
warn!("slow {name} still running after {elapsed:.3}s",);
|
||||
|
||||
attempt += 1;
|
||||
}
|
||||
|
||||
@@ -672,6 +672,10 @@ fn start_pageserver(
|
||||
}
|
||||
});
|
||||
|
||||
// Allocate a bunch of memory.
|
||||
let alloc = allocate(256 * 1024 * 1024);
|
||||
println!("allocated {}b", alloc.len());
|
||||
|
||||
// Wait for cancellation signal and shut down the pageserver.
|
||||
//
|
||||
// This cancels the `shutdown_pageserver` cancellation tree. Right now that tree doesn't
|
||||
@@ -695,6 +699,17 @@ fn start_pageserver(
|
||||
})
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn allocate(size: usize) -> Vec<u8> {
|
||||
allocate_inline(size)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn allocate_inline(size: usize) -> Vec<u8> {
|
||||
println!("allocating {size}b");
|
||||
vec![9; size]
|
||||
}
|
||||
|
||||
async fn create_remote_storage_client(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
|
||||
@@ -40,7 +40,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::logging::log_slow;
|
||||
use utils::logging::warn_slow;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
use utils::{
|
||||
@@ -83,8 +83,8 @@ use std::os::fd::AsRawFd;
|
||||
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
/// Threshold at which to log slow GetPage requests.
|
||||
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
/// Threshold at which to log a warning about slow GetPage requests.
|
||||
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -1086,145 +1086,11 @@ impl PageServerHandler {
|
||||
batch
|
||||
};
|
||||
|
||||
// Dispatch the batch to the appropriate request handler.
|
||||
let (mut handler_results, span) = log_slow(
|
||||
batch.as_static_str(),
|
||||
LOG_SLOW_GETPAGE_THRESHOLD,
|
||||
self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// We purposefully don't count flush time into the smgr operation timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
//
|
||||
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
|
||||
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
|
||||
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
|
||||
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
|
||||
let flush_timers = {
|
||||
let flushing_start_time = Instant::now();
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_, timer)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
),
|
||||
Err(_) => {
|
||||
// TODO: measure errors
|
||||
None
|
||||
}
|
||||
};
|
||||
flush_timers.push(flush_timer);
|
||||
}
|
||||
assert_eq!(flush_timers.len(), handler_results.len());
|
||||
flush_timers
|
||||
};
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e.err);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed)) => response_msg,
|
||||
};
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
|
||||
pgb_writer.write_message_noflush(&BeMessage::CopyData(
|
||||
&response_msg.serialize(protocol_version),
|
||||
))?;
|
||||
|
||||
// what we want to do
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = flush_fut => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper which dispatches a batched message to the appropriate handler.
|
||||
/// Returns a vec of results, along with the extracted trace span.
|
||||
async fn pagestream_dispatch_batched_message(
|
||||
&mut self,
|
||||
batch: BatchedFeMessage,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
(
|
||||
// invoke handler function
|
||||
let (mut handler_results, span): (
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
Span,
|
||||
),
|
||||
QueryError,
|
||||
> {
|
||||
Ok(match batch {
|
||||
_,
|
||||
) = match batch {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
timer,
|
||||
@@ -1346,7 +1212,122 @@ impl PageServerHandler {
|
||||
// call the handler.
|
||||
(vec![Err(error)], span)
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// We purposefully don't count flush time into the smgr operation timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
//
|
||||
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
|
||||
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
|
||||
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
|
||||
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
|
||||
let flush_timers = {
|
||||
let flushing_start_time = Instant::now();
|
||||
let mut flush_timers = Vec::with_capacity(handler_results.len());
|
||||
for handler_result in &mut handler_results {
|
||||
let flush_timer = match handler_result {
|
||||
Ok((_, timer)) => Some(
|
||||
timer
|
||||
.observe_execution_end(flushing_start_time)
|
||||
.expect("we are the first caller"),
|
||||
),
|
||||
Err(_) => {
|
||||
// TODO: measure errors
|
||||
None
|
||||
}
|
||||
};
|
||||
flush_timers.push(flush_timer);
|
||||
}
|
||||
assert_eq!(flush_timers.len(), handler_results.len());
|
||||
flush_timers
|
||||
};
|
||||
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
let full = utils::error::report_compact_sources(&e.err);
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
req: e.req,
|
||||
message: e.err.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed)) => response_msg,
|
||||
};
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
|
||||
pgb_writer.write_message_noflush(&BeMessage::CopyData(
|
||||
&response_msg.serialize(protocol_version),
|
||||
))?;
|
||||
|
||||
// what we want to do
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = flush_fut => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pagestream sub-protocol handler.
|
||||
@@ -1492,16 +1473,19 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let result = self
|
||||
.pagesteam_handle_batched_message(
|
||||
let result = warn_slow(
|
||||
msg.as_static_str(),
|
||||
WARN_SLOW_GETPAGE_THRESHOLD,
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
msg,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => {}
|
||||
Err(e) => break e,
|
||||
@@ -1665,13 +1649,17 @@ impl PageServerHandler {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&ctx,
|
||||
warn_slow(
|
||||
batch.as_static_str(),
|
||||
WARN_SLOW_GETPAGE_THRESHOLD,
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&ctx,
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -693,15 +693,16 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
/// This is a conversion from our internal tenant config object to the one used
|
||||
/// in external APIs.
|
||||
impl From<TenantConfOpt> for models::TenantConfig {
|
||||
// TODO(vlad): These are now the same, but they have different serialization logic.
|
||||
// Can we merge them?
|
||||
fn from(value: TenantConfOpt) -> Self {
|
||||
fn humantime(d: Duration) -> String {
|
||||
format!("{}s", d.as_secs())
|
||||
}
|
||||
Self {
|
||||
checkpoint_distance: value.checkpoint_distance,
|
||||
checkpoint_timeout: value.checkpoint_timeout,
|
||||
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
|
||||
compaction_algorithm: value.compaction_algorithm,
|
||||
compaction_target_size: value.compaction_target_size,
|
||||
compaction_period: value.compaction_period,
|
||||
compaction_period: value.compaction_period.map(humantime),
|
||||
compaction_threshold: value.compaction_threshold,
|
||||
compaction_upper_limit: value.compaction_upper_limit,
|
||||
compaction_l0_first: value.compaction_l0_first,
|
||||
@@ -710,23 +711,24 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload: value.l0_flush_wait_upload,
|
||||
gc_horizon: value.gc_horizon,
|
||||
gc_period: value.gc_period,
|
||||
gc_period: value.gc_period.map(humantime),
|
||||
image_creation_threshold: value.image_creation_threshold,
|
||||
pitr_interval: value.pitr_interval,
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
|
||||
lagging_wal_timeout: value.lagging_wal_timeout,
|
||||
pitr_interval: value.pitr_interval.map(humantime),
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
heatmap_period: value.heatmap_period,
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length,
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: value.rel_size_v2_enabled,
|
||||
@@ -758,10 +760,29 @@ mod tests {
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_err() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5a".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
|
||||
|
||||
assert!(
|
||||
tenant_conf_opt.is_err(),
|
||||
"Suceeded to convert TenantConfig to TenantConfOpt"
|
||||
);
|
||||
|
||||
let expected_error_str =
|
||||
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
|
||||
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_success() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some(Duration::from_secs(5)),
|
||||
lagging_wal_timeout: Some("5s".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -598,7 +598,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let client = mgmt_api::Client::new(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
@@ -1354,10 +1357,7 @@ async fn handle_safekeeper_scheduling_policy(
|
||||
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
|
||||
.await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
|
||||
@@ -53,6 +53,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the safekeepers it controls
|
||||
#[arg(long)]
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the control plane, when calling
|
||||
/// the compute notification endpoint
|
||||
#[arg(long)]
|
||||
@@ -153,7 +157,8 @@ impl Default for StrictMode {
|
||||
struct Secrets {
|
||||
database_url: String,
|
||||
public_key: Option<JwtAuth>,
|
||||
jwt_token: Option<String>,
|
||||
pageserver_jwt_token: Option<String>,
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
@@ -161,6 +166,7 @@ struct Secrets {
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const SAFEKEEPER_JWT_TOKEN_ENV: &'static str = "SAFEKEEPER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
@@ -184,7 +190,14 @@ impl Secrets {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
|
||||
pageserver_jwt_token: Self::load_secret(
|
||||
&args.jwt_token,
|
||||
Self::PAGESERVER_JWT_TOKEN_ENV,
|
||||
),
|
||||
safekeeper_jwt_token: Self::load_secret(
|
||||
&args.safekeeper_jwt_token,
|
||||
Self::SAFEKEEPER_JWT_TOKEN_ENV,
|
||||
),
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
@@ -264,11 +277,17 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let secrets = Secrets::load(&args).await?;
|
||||
|
||||
// TODO: once we've rolled out the safekeeper JWT token everywhere, put it into the validation code below
|
||||
tracing::info!(
|
||||
"safekeeper_jwt_token set: {:?}",
|
||||
secrets.safekeeper_jwt_token.is_some()
|
||||
);
|
||||
|
||||
// Validate required secrets and arguments are provided in strict mode
|
||||
match strict_mode {
|
||||
StrictMode::Strict
|
||||
if (secrets.public_key.is_none()
|
||||
|| secrets.jwt_token.is_none()
|
||||
|| secrets.pageserver_jwt_token.is_none()
|
||||
|| secrets.control_plane_jwt_token.is_none()) =>
|
||||
{
|
||||
// Production systems should always have secrets configured: if public_key was not set
|
||||
@@ -293,7 +312,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
jwt_token: secrets.jwt_token,
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use json_structural_diff::JsonDiff;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
@@ -25,7 +24,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
@@ -297,7 +296,7 @@ impl Reconciler {
|
||||
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -418,7 +417,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client
|
||||
@@ -441,7 +440,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
@@ -479,7 +478,7 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -772,7 +771,7 @@ impl Reconciler {
|
||||
let observed_conf = match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -881,27 +880,7 @@ impl Reconciler {
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
|
||||
let diff = match observed {
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let diff = JsonDiff::diff(
|
||||
&serde_json::to_value(observed.clone()).unwrap(),
|
||||
&serde_json::to_value(wanted_conf.clone()).unwrap(),
|
||||
false,
|
||||
);
|
||||
|
||||
if let Some(json_diff) = diff.diff {
|
||||
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
_ => "full".to_string(),
|
||||
};
|
||||
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
|
||||
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
|
||||
// function: this could be avoided by refactoring the state mutated by location_config into
|
||||
@@ -1120,7 +1099,7 @@ impl Reconciler {
|
||||
match origin
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -1201,7 +1180,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
|
||||
@@ -348,7 +348,12 @@ pub struct Config {
|
||||
// All pageservers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the pageservers it manages.
|
||||
pub jwt_token: Option<String>,
|
||||
pub pageserver_jwt_token: Option<String>,
|
||||
|
||||
// All safekeepers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the safekeepers it manages.
|
||||
pub safekeeper_jwt_token: Option<String>,
|
||||
|
||||
// This JWT token will be used to authenticate this service to the control plane.
|
||||
pub control_plane_jwt_token: Option<String>,
|
||||
@@ -882,7 +887,7 @@ impl Service {
|
||||
let response = node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
timeout,
|
||||
@@ -983,7 +988,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
match client
|
||||
.location_config(
|
||||
@@ -1553,14 +1558,14 @@ impl Service {
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.pageserver_jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.safekeeper_jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
@@ -1907,7 +1912,7 @@ impl Service {
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -1965,7 +1970,7 @@ impl Service {
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -2921,9 +2926,7 @@ impl Service {
|
||||
first
|
||||
};
|
||||
|
||||
let updated_config = base
|
||||
.apply_patch(patch)
|
||||
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
|
||||
let updated_config = base.apply_patch(patch);
|
||||
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
|
||||
.await
|
||||
}
|
||||
@@ -3102,7 +3105,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
@@ -3163,7 +3166,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
futs.push(async move {
|
||||
let result = client
|
||||
@@ -3286,7 +3289,7 @@ impl Service {
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
@@ -3505,7 +3508,7 @@ impl Service {
|
||||
let timeline_info = create_one(
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -3521,7 +3524,7 @@ impl Service {
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.0.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.jwt_token;
|
||||
let jwt = &self.config.pageserver_jwt_token;
|
||||
self.tenant_for_shards(
|
||||
targets
|
||||
.0
|
||||
@@ -3604,7 +3607,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
req.clone(),
|
||||
))
|
||||
})
|
||||
@@ -3685,7 +3688,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -3759,7 +3762,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
@@ -3874,7 +3877,7 @@ impl Service {
|
||||
futs.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
@@ -4123,7 +4126,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -4145,7 +4148,7 @@ impl Service {
|
||||
shard_zero_tid,
|
||||
timeline_id,
|
||||
shard_zero_locations.latest.node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
@@ -4544,7 +4547,7 @@ impl Service {
|
||||
|
||||
client.location_config(child_id, config, None, false).await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
@@ -5144,7 +5147,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
@@ -5470,7 +5473,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
@@ -6651,12 +6654,11 @@ impl Service {
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
let reconcile_needed = shard.get_reconcile_needed(nodes);
|
||||
|
||||
let reconcile_reason = match reconcile_needed {
|
||||
match reconcile_needed {
|
||||
ReconcileNeeded::No => return None,
|
||||
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
|
||||
ReconcileNeeded::Yes(reason) => {
|
||||
ReconcileNeeded::Yes => {
|
||||
// Fall through to try and acquire units for spawning reconciler
|
||||
reason
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6695,7 +6697,6 @@ impl Service {
|
||||
};
|
||||
|
||||
shard.spawn_reconciler(
|
||||
reconcile_reason,
|
||||
&self.result_tx,
|
||||
nodes,
|
||||
&self.compute_hook,
|
||||
@@ -7098,7 +7099,7 @@ impl Service {
|
||||
match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7134,7 +7135,7 @@ impl Service {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7189,7 +7190,7 @@ impl Service {
|
||||
let request = request_ref.clone();
|
||||
client.top_tenant_shards(request.clone()).await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -7362,7 +7363,7 @@ impl Service {
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
|
||||
@@ -481,14 +481,7 @@ pub(crate) enum ReconcileNeeded {
|
||||
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
||||
WaitExisting(ReconcilerWaiter),
|
||||
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
||||
Yes(ReconcileReason),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReconcileReason {
|
||||
ActiveNodesDirty,
|
||||
UnknownLocation,
|
||||
PendingComputeNotification,
|
||||
Yes,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
@@ -1348,18 +1341,12 @@ impl TenantShard {
|
||||
|
||||
let active_nodes_dirty = self.dirty(pageservers);
|
||||
|
||||
let reconcile_needed = match (
|
||||
active_nodes_dirty,
|
||||
dirty_observed,
|
||||
self.pending_compute_notification,
|
||||
) {
|
||||
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
|
||||
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
|
||||
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
|
||||
_ => ReconcileNeeded::No,
|
||||
};
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
let do_reconcile =
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
|
||||
if matches!(reconcile_needed, ReconcileNeeded::No) {
|
||||
if !do_reconcile {
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
@@ -1402,7 +1389,7 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
reconcile_needed
|
||||
ReconcileNeeded::Yes
|
||||
}
|
||||
|
||||
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
||||
@@ -1492,7 +1479,6 @@ impl TenantShard {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn spawn_reconciler(
|
||||
&mut self,
|
||||
reason: ReconcileReason,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
@@ -1552,7 +1538,7 @@ impl TenantShard {
|
||||
let reconcile_seq = self.sequence;
|
||||
let long_reconcile_threshold = service_config.long_reconcile_threshold;
|
||||
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -3817,43 +3817,3 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["listen_https_port"] is None
|
||||
|
||||
|
||||
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate that a storage controller restart with no shards in a transient state
|
||||
performs zero reconciliations at start-up. Implicitly, this means that the location
|
||||
configs returned by the pageserver are identical to the persisted state in the
|
||||
storage controller database.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 1
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"start_as_candidate": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_before_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_before_restart != 0
|
||||
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_after_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
Reference in New Issue
Block a user