diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index da11ac2860..4796a07d92 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -118,16 +118,18 @@ struct Cli { #[arg(long)] pub set_disk_quota_for_fs: Option, - #[arg(short = 's', long = "spec", group = "spec")] - pub spec_json: Option, - #[arg(short = 'S', long, group = "spec-path")] pub spec_path: Option, #[arg(short = 'i', long, group = "compute-id")] pub compute_id: String, - #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")] + #[arg( + short = 'p', + long, + conflicts_with = "spec-path", + value_name = "CONTROL_PLANE_API_BASE_URL" + )] pub control_plane_uri: Option, } @@ -172,7 +174,6 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, - live_config_allowed: cli_spec.live_config_allowed, }, cli_spec.spec, cli_spec.compute_ctl_config, @@ -201,23 +202,12 @@ async fn init() -> Result<()> { } fn try_spec_from_cli(cli: &Cli) -> Result { - // First, try to get cluster spec from the cli argument - if let Some(ref spec_json) = cli.spec_json { - info!("got spec from cli argument {}", spec_json); - return Ok(CliSpecParams { - spec: Some(serde_json::from_str(spec_json)?), - compute_ctl_config: ComputeCtlConfig::default(), - live_config_allowed: false, - }); - } - - // Second, try to read it from the file if path is provided + // First, read spec from the path if provided if let Some(ref spec_path) = cli.spec_path { let file = File::open(Path::new(spec_path))?; return Ok(CliSpecParams { spec: Some(serde_json::from_reader(file)?), compute_ctl_config: ComputeCtlConfig::default(), - live_config_allowed: true, }); } @@ -225,11 +215,12 @@ fn try_spec_from_cli(cli: &Cli) -> Result { panic!("must specify --control-plane-uri"); }; + // If the spec wasn't provided in the CLI arguments, then retrieve it from + // the control plane match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) { Ok(resp) => Ok(CliSpecParams { spec: resp.0, compute_ctl_config: resp.1, - live_config_allowed: true, }), Err(e) => { error!( @@ -247,7 +238,6 @@ struct CliSpecParams { spec: Option, #[allow(dead_code)] compute_ctl_config: ComputeCtlConfig, - live_config_allowed: bool, } fn deinit_and_exit(exit_code: Option) -> ! { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9dfcde1dbc..ad8925e7ab 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -93,20 +93,6 @@ pub struct ComputeNodeParams { /// the address of extension storage proxy gateway pub ext_remote_storage: Option, - - /// We should only allow live re- / configuration of the compute node if - /// it uses 'pull model', i.e. it can go to control-plane and fetch - /// the latest configuration. Otherwise, there could be a case: - /// - we start compute with some spec provided as argument - /// - we push new spec and it does reconfiguration - /// - but then something happens and compute pod / VM is destroyed, - /// so k8s controller starts it again with the **old** spec - /// - /// and the same for empty computes: - /// - we started compute without any spec - /// - we push spec and it does configuration - /// - but then it is restarted without any spec again - pub live_config_allowed: bool, } /// Compute node info shared across several `compute_ctl` threads. diff --git a/compute_tools/src/http/middleware/authorize.rs b/compute_tools/src/http/middleware/authorize.rs index 89d55e1af3..ee3a5cb953 100644 --- a/compute_tools/src/http/middleware/authorize.rs +++ b/compute_tools/src/http/middleware/authorize.rs @@ -6,20 +6,15 @@ use axum_extra::{ TypedHeader, headers::{Authorization, authorization::Bearer}, }; +use compute_api::requests::ComputeClaims; use futures::future::BoxFuture; use http::{Request, Response, StatusCode}; use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet}; -use serde::Deserialize; use tower_http::auth::AsyncAuthorizeRequest; use tracing::warn; use crate::http::{JsonResponse, extract::RequestId}; -#[derive(Clone, Debug, Deserialize)] -pub(in crate::http) struct Claims { - compute_id: String, -} - #[derive(Clone, Debug)] pub(in crate::http) struct Authorize { compute_id: String, @@ -112,7 +107,11 @@ impl AsyncAuthorizeRequest for Authorize { impl Authorize { /// Verify the token using the JSON Web Key set and return the token data. - fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result> { + fn verify( + jwks: &JwkSet, + token: &str, + validation: &Validation, + ) -> Result> { for jwk in jwks.keys.iter() { let decoding_key = match DecodingKey::from_jwk(jwk) { Ok(key) => key, @@ -127,7 +126,7 @@ impl Authorize { } }; - match jsonwebtoken::decode::(token, &decoding_key, validation) { + match jsonwebtoken::decode::(token, &decoding_key, validation) { Ok(data) => return Ok(data), Err(e) => { warn!( diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index 3c5a6a6d41..f7a19da611 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -22,13 +22,6 @@ pub(in crate::http) async fn configure( State(compute): State>, request: Json, ) -> Response { - if !compute.params.live_config_allowed { - return JsonResponse::error( - StatusCode::PRECONDITION_FAILED, - "live configuration is not allowed for this compute node".to_string(), - ); - } - let pspec = match ParsedSpec::try_from(request.spec.clone()) { Ok(p) => p, Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e), diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 2616afbb16..8e2a110366 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -15,7 +15,7 @@ use clap::ValueEnum; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; -use utils::auth::{Claims, encode_from_key_file}; +use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage}; @@ -757,7 +757,7 @@ impl LocalEnv { } // this function is used only for testing purposes in CLI e g generate tokens during init - pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result { + pub fn generate_auth_token(&self, claims: &S) -> anyhow::Result { let private_key_path = self.get_private_key_path(); let key_data = fs::read(private_key_path)?; encode_from_key_file(claims, &key_data) diff --git a/libs/compute_api/src/requests.rs b/libs/compute_api/src/requests.rs index 3fbdfcf83f..98f2fc297c 100644 --- a/libs/compute_api/src/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -5,6 +5,14 @@ use crate::privilege::Privilege; use crate::responses::ComputeCtlConfig; use crate::spec::{ComputeSpec, ExtVersion, PgIdent}; +/// When making requests to the `compute_ctl` external HTTP server, the client +/// must specify a set of claims in `Authorization` header JWTs such that +/// `compute_ctl` can authorize the request. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ComputeClaims { + pub compute_id: String, +} + /// Request of the /configure API /// /// We now pass only `spec` in the configuration request, but later we can diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index cc5b0b1d13..db4fc5685c 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -173,7 +173,7 @@ impl std::fmt::Debug for JwtAuth { } // this function is used only for testing purposes in CLI e g generate tokens during init -pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result { +pub fn encode_from_key_file(claims: &S, key_data: &[u8]) -> Result { let key = EncodingKey::from_ed_pem(key_data)?; Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?) } diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 77b3f90b3e..215682d90c 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -65,7 +65,7 @@ use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; use once_cell::sync::Lazy; use pageserver::config::PageServerConf; -use pageserver::walredo::PostgresRedoManager; +use pageserver::walredo::{PostgresRedoManager, RedoAttemptType}; use pageserver_api::key::Key; use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; @@ -223,7 +223,14 @@ impl Request { // TODO: avoid these clones manager - .request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version) + .request_redo( + *key, + *lsn, + base_img.clone(), + records.clone(), + *pg_version, + RedoAttemptType::ReadPage, + ) .await .context("request_redo") } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 200b91fc82..9bb761dc48 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2274,6 +2274,7 @@ async fn timeline_compact_handler( if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? { flags |= CompactFlags::DryRun; } + // Manual compaction does not yield for L0. let wait_until_uploaded = parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2ac2fd0b81..d3623fc3b9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -100,7 +100,7 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow; use crate::tenant::timeline::uninit::cleanup_timeline_directory; use crate::virtual_file::VirtualFile; use crate::walingest::WalLagCooldown; -use crate::walredo::PostgresRedoManager; +use crate::walredo::{PostgresRedoManager, RedoAttemptType}; use crate::{InitializationOrder, TEMP_FILE_SUFFIX, import_datadir, span, task_mgr, walredo}; static INIT_DB_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(8)); @@ -473,15 +473,16 @@ impl WalRedoManager { base_img: Option<(Lsn, bytes::Bytes)>, records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, pg_version: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { match self { Self::Prod(_, mgr) => { - mgr.request_redo(key, lsn, base_img, records, pg_version) + mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type) .await } #[cfg(test)] Self::Test(mgr) => { - mgr.request_redo(key, lsn, base_img, records, pg_version) + mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type) .await } } @@ -5879,6 +5880,7 @@ pub(crate) mod harness { base_img: Option<(Lsn, Bytes)>, records: Vec<(Lsn, NeonWalRecord)>, _pg_version: u32, + _redo_attempt_type: RedoAttemptType, ) -> Result { let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1)); if records_neon { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 67b59d227b..c9ce1d6d8b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::PERF_TRACE_TARGET; +use crate::walredo::RedoAttemptType; use anyhow::{Context, Result, anyhow, bail, ensure}; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; @@ -1293,6 +1294,12 @@ impl Timeline { }; reconstruct_state.read_path = read_path; + let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction { + RedoAttemptType::LegacyCompaction + } else { + RedoAttemptType::ReadPage + }; + let traversal_res: Result<(), _> = { let ctx = RequestContextBuilder::from(ctx) .perf_span(|crnt_perf_span| { @@ -1380,7 +1387,7 @@ impl Timeline { let walredo_deltas = converted.num_deltas(); let walredo_res = walredo_self - .reconstruct_value(key, lsn, converted) + .reconstruct_value(key, lsn, converted, redo_attempt_type) .maybe_perf_instrument(&ctx, |crnt_perf_span| { info_span!( target: PERF_TRACE_TARGET, @@ -6351,37 +6358,21 @@ impl Timeline { /// Reconstruct a value, using the given base image and WAL records in 'data'. async fn reconstruct_value( - &self, - key: Key, - request_lsn: Lsn, - data: ValueReconstructState, - ) -> Result { - self.reconstruct_value_inner(key, request_lsn, data, false) - .await - } - - /// Reconstruct a value, using the given base image and WAL records in 'data'. It does not fire critical errors because - /// sometimes it is expected to fail due to unreplayable history described in . - async fn reconstruct_value_wo_critical_error( - &self, - key: Key, - request_lsn: Lsn, - data: ValueReconstructState, - ) -> Result { - self.reconstruct_value_inner(key, request_lsn, data, true) - .await - } - - async fn reconstruct_value_inner( &self, key: Key, request_lsn: Lsn, mut data: ValueReconstructState, - no_critical_error: bool, + redo_attempt_type: RedoAttemptType, ) -> Result { // Perform WAL redo if needed data.records.reverse(); + let fire_critical_error = match redo_attempt_type { + RedoAttemptType::ReadPage => true, + RedoAttemptType::LegacyCompaction => true, + RedoAttemptType::GcCompaction => false, + }; + // If we have a page image, and no WAL, we're all set if data.records.is_empty() { if let Some((img_lsn, img)) = &data.img { @@ -6428,13 +6419,20 @@ impl Timeline { .as_ref() .context("timeline has no walredo manager") .map_err(PageReconstructError::WalRedo)? - .request_redo(key, request_lsn, data.img, data.records, self.pg_version) + .request_redo( + key, + request_lsn, + data.img, + data.records, + self.pg_version, + redo_attempt_type, + ) .await; let img = match res { Ok(img) => img, Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled), Err(walredo::Error::Other(err)) => { - if !no_critical_error { + if fire_critical_error { critical!("walredo failure during page reconstruction: {err:?}"); } return Err(PageReconstructError::WalRedo( diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 414256ccec..8e778e8674 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -7,7 +7,7 @@ use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use super::layer_manager::LayerManager; use super::{ @@ -16,6 +16,8 @@ use super::{ Timeline, }; +use crate::tenant::timeline::DeltaEntry; +use crate::walredo::RedoAttemptType; use anyhow::{Context, anyhow}; use bytes::Bytes; use enumset::EnumSet; @@ -315,6 +317,9 @@ impl GcCompactionQueue { flags: { let mut flags = EnumSet::new(); flags |= CompactFlags::EnhancedGcBottomMostCompaction; + if timeline.get_compaction_l0_first() { + flags |= CompactFlags::YieldForL0; + } flags }, sub_compaction: true, @@ -820,15 +825,16 @@ pub struct CompactionStatistics { time_acquire_lock_secs: f64, time_analyze_secs: f64, time_download_layer_secs: f64, + time_to_first_kv_pair_secs: f64, time_main_loop_secs: f64, time_final_phase_secs: f64, time_total_secs: f64, // Summary - /// Ratio of the key-value size before/after gc-compaction. - uncompressed_size_ratio: f64, - /// Ratio of the physical size before/after gc-compaction. - physical_size_ratio: f64, + /// Ratio of the key-value size after/before gc-compaction. + uncompressed_retention_ratio: f64, + /// Ratio of the physical size after/before gc-compaction. + compressed_retention_ratio: f64, } impl CompactionStatistics { @@ -897,15 +903,15 @@ impl CompactionStatistics { fn finalize(&mut self) { let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size; let produced_key_value_size = self.image_produced.size + self.wal_produced.size; - self.uncompressed_size_ratio = - original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0 + self.uncompressed_retention_ratio = + produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0 let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size; let produced_physical_size = self.image_layer_produced.size + self.delta_layer_produced.size + self.image_layer_discarded.size + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate - self.physical_size_ratio = - original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0 + self.compressed_retention_ratio = + produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0 } } @@ -2416,7 +2422,7 @@ impl Timeline { lsn_split_points[i] }; let img = self - .reconstruct_value_wo_critical_error(key, request_lsn, state) + .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction) .await?; Some((request_lsn, img)) } else { @@ -3037,7 +3043,7 @@ impl Timeline { .map_err(CompactionError::Other)?; let time_download_layer = timer.elapsed(); - let timer = Instant::now(); + let mut timer = Instant::now(); // Step 2: Produce images+deltas. let mut accumulated_values = Vec::new(); @@ -3115,6 +3121,7 @@ impl Timeline { // Actually, we can decide not to write to the image layer at all at this point because // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. + let mut time_to_first_kv_pair = None; while let Some(((key, lsn, val), desc)) = merge_iter .next_with_trace() @@ -3122,6 +3129,11 @@ impl Timeline { .context("failed to get next key-value pair") .map_err(CompactionError::Other)? { + if time_to_first_kv_pair.is_none() { + time_to_first_kv_pair = Some(timer.elapsed()); + timer = Instant::now(); + } + if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } @@ -3463,6 +3475,9 @@ impl Timeline { let time_final_phase = timer.elapsed(); stat.time_final_phase_secs = time_final_phase.as_secs_f64(); + stat.time_to_first_kv_pair_secs = time_to_first_kv_pair + .unwrap_or(Duration::ZERO) + .as_secs_f64(); stat.time_main_loop_secs = time_main_loop.as_secs_f64(); stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64(); stat.time_download_layer_secs = time_download_layer.as_secs_f64(); @@ -3927,8 +3942,6 @@ impl CompactionLayer for OwnArc { } } -use crate::tenant::timeline::DeltaEntry; - impl CompactionLayer for ResidentDeltaLayer { fn key_range(&self) -> &Range { &self.0.layer_desc().key_range diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 22d8d83811..ed8a954369 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -136,6 +136,16 @@ macro_rules! bail { } } +#[derive(Debug, Clone, Copy)] +pub enum RedoAttemptType { + /// Used for the read path. Will fire critical errors and retry twice if failure. + ReadPage, + // Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure. + LegacyCompaction, + // Used for gc compaction. Will not fire critical errors and not retry. + GcCompaction, +} + /// /// Public interface of WAL redo manager /// @@ -156,11 +166,18 @@ impl PostgresRedoManager { base_img: Option<(Lsn, Bytes)>, records: Vec<(Lsn, NeonWalRecord)>, pg_version: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { if records.is_empty() { bail!("invalid WAL redo request with no records"); } + let max_retry_attempts = match redo_attempt_type { + RedoAttemptType::ReadPage => 2, + RedoAttemptType::LegacyCompaction => 1, + RedoAttemptType::GcCompaction => 0, + }; + let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID); let mut img = base_img.map(|p| p.1); let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1); @@ -180,6 +197,7 @@ impl PostgresRedoManager { &records[batch_start..i], self.conf.wal_redo_timeout, pg_version, + max_retry_attempts, ) .await }; @@ -201,6 +219,7 @@ impl PostgresRedoManager { &records[batch_start..], self.conf.wal_redo_timeout, pg_version, + max_retry_attempts, ) .await } @@ -424,11 +443,11 @@ impl PostgresRedoManager { records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, pg_version: u32, + max_retry_attempts: u32, ) -> Result { *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); let (rel, blknum) = key.to_rel_block().context("invalid record")?; - const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { let base_img = &base_img; @@ -486,7 +505,7 @@ impl PostgresRedoManager { info!(n_attempts, "retried walredo succeeded"); } n_attempts += 1; - if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() { + if n_attempts > max_retry_attempts || result.is_ok() { return result; } } @@ -560,6 +579,7 @@ mod tests { use super::PostgresRedoManager; use crate::config::PageServerConf; + use crate::walredo::RedoAttemptType; #[tokio::test] async fn test_ping() { @@ -593,6 +613,7 @@ mod tests { None, short_records(), 14, + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await @@ -621,6 +642,7 @@ mod tests { None, short_records(), 14, + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await @@ -642,6 +664,7 @@ mod tests { None, short_records(), 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */ + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await diff --git a/test_runner/fixtures/pageserver/common_types.py b/test_runner/fixtures/pageserver/common_types.py index 0e068db593..0a92883e96 100644 --- a/test_runner/fixtures/pageserver/common_types.py +++ b/test_runner/fixtures/pageserver/common_types.py @@ -105,7 +105,7 @@ def parse_layer_file_name(file_name: str) -> LayerName: except InvalidFileName: pass - raise InvalidFileName("neither image nor delta layer") + raise InvalidFileName(f"neither image nor delta layer: {file_name}") def is_future_layer(layer_file_name: LayerName, disk_consistent_lsn: Lsn): diff --git a/test_runner/performance/pageserver/util.py b/test_runner/performance/pageserver/util.py index 7a6d88f79c..b50659defc 100644 --- a/test_runner/performance/pageserver/util.py +++ b/test_runner/performance/pageserver/util.py @@ -40,6 +40,8 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int): for layer in info.historic_layers: assert not layer.remote + env.storage_controller.reconcile_until_idle(timeout_secs=60) + log.info("ready") diff --git a/test_runner/performance/test_perf_oltp_large_tenant.py b/test_runner/performance/test_perf_oltp_large_tenant.py index 957a4ec796..b45394d627 100644 --- a/test_runner/performance/test_perf_oltp_large_tenant.py +++ b/test_runner/performance/test_perf_oltp_large_tenant.py @@ -145,11 +145,14 @@ def run_database_maintenance(env: PgCompare): END $$; """ ) - - log.info("start REINDEX TABLE CONCURRENTLY transaction.transaction") - with env.zenbenchmark.record_duration("reindex concurrently"): - cur.execute("REINDEX TABLE CONCURRENTLY transaction.transaction;") - log.info("finished REINDEX TABLE CONCURRENTLY transaction.transaction") + # in production a customer would likely use reindex concurrently + # but for our test we don't care about the downtime + # and it would just about double the time we report in the test + # because we need one more table scan for each index + log.info("start REINDEX TABLE transaction.transaction") + with env.zenbenchmark.record_duration("reindex"): + cur.execute("REINDEX TABLE transaction.transaction;") + log.info("finished REINDEX TABLE transaction.transaction") @pytest.mark.parametrize("custom_scripts", get_custom_scripts()) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 6789939e0c..087fafb327 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -38,12 +38,34 @@ PREEMPT_COMPACTION_TENANT_CONF = { "compaction_target_size": 1024**2, "image_creation_threshold": 1, "image_creation_preempt_threshold": 1, - # compact more frequently + # Compact more frequently "compaction_threshold": 3, "compaction_upper_limit": 6, "lsn_lease_length": "0s", } +PREEMPT_GC_COMPACTION_TENANT_CONF = { + "gc_period": "5s", + "compaction_period": "5s", + # Small checkpoint distance to create many layers + "checkpoint_distance": 1024**2, + # Compact small layers + "compaction_target_size": 1024**2, + "image_creation_threshold": 10000, # Do not create image layers at all + "image_creation_preempt_threshold": 10000, + # Compact more frequently + "compaction_threshold": 3, + "compaction_upper_limit": 6, + "lsn_lease_length": "0s", + # Enable gc-compaction + "gc_compaction_enabled": "true", + "gc_compaction_initial_threshold_kb": 1024, # At a small threshold + "gc_compaction_ratio_percent": 1, + # No PiTR interval and small GC horizon + "pitr_interval": "0s", + "gc_horizon": f"{1024**2}", +} + @skip_in_debug_build("only run with release build") @pytest.mark.parametrize( @@ -165,6 +187,41 @@ def test_pageserver_compaction_preempt( env.pageserver.assert_log_contains("resuming image layer creation") +@skip_in_debug_build("only run with release build") +def test_pageserver_gc_compaction_preempt( + neon_env_builder: NeonEnvBuilder, +): + # Ideally we should be able to do unit tests for this, but we need real Postgres + # WALs in order to do unit testing... + + conf = PREEMPT_GC_COMPACTION_TENANT_CONF.copy() + env = neon_env_builder.init_start(initial_tenant_conf=conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + row_count = 200000 + churn_rounds = 10 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + log.info("Writing initial data ...") + workload.write_rows(row_count, env.pageserver.id) + + for i in range(1, churn_rounds + 1): + log.info(f"Running churn round {i}/{churn_rounds} ...") + workload.churn_rows(row_count, env.pageserver.id, upload=False) + workload.validate(env.pageserver.id) + ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True) + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + # ensure gc_compaction gets preempted and then resumed + env.pageserver.assert_log_contains("preempt gc-compaction") + + @skip_in_debug_build("only run with release build") @pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM @pytest.mark.parametrize( diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index c73a592d98..d03d05d33d 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -61,7 +61,7 @@ def evict_random_layers( ) client = pageserver.http_client() for layer in initial_local_layers: - if "ephemeral" in layer.name or "temp_download" in layer.name: + if "ephemeral" in layer.name or "temp_download" in layer.name or ".___temp" in layer.name: continue layer_name = parse_layer_file_name(layer.name)