From c34d36d8a270b9a4910d4d26210e7c608288f079 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 11 Jul 2025 14:49:37 +0400 Subject: [PATCH 01/14] storcon_cli: timeline-safekeeper-migrate and timeline-locate subcommands (#12548) ## Problem We have a `safekeeper_migrate` handler, but no subcommand in `storcon_cli`. Same for `/:timeline_id/locate` for identifying current set of safekeepers. - Closes: https://github.com/neondatabase/neon/issues/12395 ## Summary of changes - Add `timeline-safekeeper-migrate` and `timeline-locate` subcommands to `storcon_cli` --- Cargo.lock | 1 + control_plane/src/broker.rs | 2 +- control_plane/src/pageserver.rs | 2 +- control_plane/src/safekeeper.rs | 2 +- control_plane/src/storage_controller.rs | 2 +- control_plane/storcon_cli/Cargo.toml | 1 + control_plane/storcon_cli/src/main.rs | 57 ++++++++++++++++++- libs/safekeeper_api/src/models.rs | 11 +++- .../src/service/safekeeper_service.rs | 10 +--- 9 files changed, 73 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c528354053..025f4e4116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6991,6 +6991,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "reqwest", + "safekeeper_api", "serde_json", "storage_controller_client", "tokio", diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index f43f459636..988b08e875 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -36,7 +36,7 @@ impl StorageBroker { pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { let broker = &self.env.broker; - print!("Starting neon broker at {}", broker.client_url()); + println!("Starting neon broker at {}", broker.client_url()); let mut args = Vec::new(); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3673d1f4f2..843ead807d 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -303,7 +303,7 @@ impl PageServerNode { async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); - print!( + println!( "Starting pageserver node {} at '{}' in {:?}, retrying for {:?}", self.conf.id, self.pg_connection_config.raw_address(), diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index da9dafd8e9..2ba2f3ebe4 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -127,7 +127,7 @@ impl SafekeeperNode { extra_opts: &[String], retry_timeout: &Duration, ) -> anyhow::Result<()> { - print!( + println!( "Starting safekeeper at '{}' in '{}', retrying for {:?}", self.pg_connection_config.raw_address(), self.datadir_path().display(), diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index bb83a6319c..dc6c82f504 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -660,7 +660,7 @@ impl StorageController { )); } - println!("Starting storage controller"); + println!("Starting storage controller at {scheme}://{host}:{listen_port}"); background_process::start_process( COMMAND, diff --git a/control_plane/storcon_cli/Cargo.toml b/control_plane/storcon_cli/Cargo.toml index ce89116691..61d48b2469 100644 --- a/control_plane/storcon_cli/Cargo.toml +++ b/control_plane/storcon_cli/Cargo.toml @@ -14,6 +14,7 @@ humantime.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true reqwest.workspace = true +safekeeper_api.workspace=true serde_json = { workspace = true, features = ["raw_value"] } storage_controller_client.workspace = true tokio.workspace = true diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 701c4b3b2e..24fd34a87a 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -11,7 +11,7 @@ use pageserver_api::controller_api::{ PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, - TenantShardMigrateRequest, TenantShardMigrateResponse, + TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest, }; use pageserver_api::models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig, @@ -21,6 +21,7 @@ use pageserver_api::models::{ use pageserver_api::shard::{ShardStripeSize, TenantShardId}; use pageserver_client::mgmt_api::{self}; use reqwest::{Certificate, Method, StatusCode, Url}; +use safekeeper_api::models::TimelineLocateResponse; use storage_controller_client::control_api::Client; use utils::id::{NodeId, TenantId, TimelineId}; @@ -279,6 +280,23 @@ enum Command { #[arg(long)] concurrency: Option, }, + /// Locate safekeepers for a timeline from the storcon DB. + TimelineLocate { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + timeline_id: TimelineId, + }, + /// Migrate a timeline to a new set of safekeepers + TimelineSafekeeperMigrate { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + timeline_id: TimelineId, + /// Example: --new-sk-set 1,2,3 + #[arg(long, required = true, value_delimiter = ',')] + new_sk_set: Vec, + }, } #[derive(Parser)] @@ -1324,7 +1342,7 @@ async fn main() -> anyhow::Result<()> { concurrency, } => { let mut path = format!( - "/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers", + "v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers", ); if let Some(c) = concurrency { @@ -1335,6 +1353,41 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::POST, path, None) .await?; } + Command::TimelineLocate { + tenant_id, + timeline_id, + } => { + let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate"); + + let resp = storcon_client + .dispatch::<(), TimelineLocateResponse>(Method::GET, path, None) + .await?; + + let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::>(); + let new_sk_set = resp + .new_sk_set + .as_ref() + .map(|ids| ids.iter().map(|id| id.0 as i64).collect::>()); + + println!("generation = {}", resp.generation); + println!("sk_set = {sk_set:?}"); + println!("new_sk_set = {new_sk_set:?}"); + } + Command::TimelineSafekeeperMigrate { + tenant_id, + timeline_id, + new_sk_set, + } => { + let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate"); + + storcon_client + .dispatch::<_, ()>( + Method::POST, + path, + Some(TimelineSafekeeperMigrateRequest { new_sk_set }), + ) + .await?; + } } Ok(()) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index e87232474b..59e112654b 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; -use crate::membership::Configuration; +use crate::membership::{Configuration, SafekeeperGeneration}; use crate::{ServerInfo, Term}; #[derive(Debug, Serialize, Deserialize)] @@ -311,3 +311,12 @@ pub struct PullTimelineResponse { pub safekeeper_host: Option, // TODO: add more fields? } + +/// Response to a timeline locate request. +/// Storcon-only API. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TimelineLocateResponse { + pub generation: SafekeeperGeneration, + pub sk_set: Vec, + pub new_sk_set: Option>, +} diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index d7179372b2..42ddf81e3e 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -25,7 +25,8 @@ use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::PgVersionId; use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration}; use safekeeper_api::models::{ - PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse, + PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest, + TimelineMembershipSwitchResponse, }; use safekeeper_api::{INITIAL_TERM, Term}; use safekeeper_client::mgmt_api; @@ -37,13 +38,6 @@ use utils::lsn::Lsn; use super::Service; -#[derive(serde::Serialize, serde::Deserialize, Clone)] -pub struct TimelineLocateResponse { - pub generation: SafekeeperGeneration, - pub sk_set: Vec, - pub new_sk_set: Option>, -} - impl Service { fn make_member_set(safekeepers: &[Safekeeper]) -> Result { let members = safekeepers From 15f633922aaa62e333ba3b92cd97d646ce56e5ef Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 11 Jul 2025 12:39:51 +0100 Subject: [PATCH 02/14] pageserver: use image consistent LSN for force image layer creation (#12547) This is a no-op for the neon deployment * Introduce the concept image consistent lsn: of the largest LSN below which all pages have been redone successfully * Use the image consistent LSN for forced image layer creations * Optionally expose the image consistent LSN via the timeline describe HTTP endpoint * Add a sharded timeline describe endpoint to storcon --------- Co-authored-by: Chen Luo --- libs/pageserver_api/src/controller_api.rs | 9 +- libs/pageserver_api/src/models.rs | 3 + pageserver/src/http/routes.rs | 19 ++ pageserver/src/tenant.rs | 34 +++ pageserver/src/tenant/layer_map.rs | 232 +++++++++++++++++- pageserver/src/tenant/timeline.rs | 32 ++- pageserver/src/tenant/timeline/compaction.rs | 89 +++---- .../src/tenant/timeline/layer_manager.rs | 1 + storage_controller/src/http.rs | 32 +++ storage_controller/src/pageserver_client.rs | 17 ++ storage_controller/src/service.rs | 88 ++++++- test_runner/fixtures/neon_fixtures.py | 14 ++ test_runner/regress/test_compaction.py | 77 +++++- 13 files changed, 567 insertions(+), 80 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a8c7083b17..b02c6a613a 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; -use crate::models::{PageserverUtilization, ShardParameters, TenantConfig}; +use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo}; use crate::shard::{ShardStripeSize, TenantShardId}; #[derive(Serialize, Deserialize, Debug)] @@ -126,6 +126,13 @@ pub struct TenantDescribeResponse { pub config: TenantConfig, } +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantTimelineDescribeResponse { + pub shards: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_consistent_lsn: Option, +} + #[derive(Serialize, Deserialize, Debug)] pub struct NodeShardResponse { pub node_id: NodeId, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 56dd95eab3..11e02a8550 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1622,6 +1622,9 @@ pub struct TimelineInfo { /// Whether the timeline is invisible in synthetic size calculations. pub is_invisible: Option, + // HADRON: the largest LSN below which all page updates have been included in the image layers. + #[serde(skip_serializing_if = "Option::is_none")] + pub image_consistent_lsn: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7030ac368d..d839bac557 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -397,6 +397,7 @@ async fn build_timeline_info( timeline: &Arc, include_non_incremental_logical_size: bool, force_await_initial_logical_size: bool, + include_image_consistent_lsn: bool, ctx: &RequestContext, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -421,6 +422,10 @@ async fn build_timeline_info( .await?, ); } + // HADRON + if include_image_consistent_lsn { + info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?); + } Ok(info) } @@ -510,6 +515,8 @@ async fn build_timeline_info_common( is_invisible: Some(is_invisible), walreceiver_status, + // HADRON + image_consistent_lsn: None, }; Ok(info) } @@ -712,6 +719,8 @@ async fn timeline_list_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); @@ -732,6 +741,7 @@ async fn timeline_list_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), &ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) @@ -760,6 +770,9 @@ async fn timeline_and_offloaded_list_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); @@ -780,6 +793,7 @@ async fn timeline_and_offloaded_list_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), &ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) @@ -964,6 +978,9 @@ async fn timeline_detail_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + // HADRON + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; // Logical size calculation needs downloading. @@ -984,6 +1001,7 @@ async fn timeline_detail_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), ctx, ) .await @@ -3643,6 +3661,7 @@ async fn activate_post_import_handler( let timeline_info = build_timeline_info( &timeline, false, // include_non_incremental_logical_size, false, // force_await_initial_logical_size + false, // include_image_consistent_lsn &ctx, ) .await diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f67269851a..f75a03a508 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12816,6 +12816,40 @@ mod tests { }, ] ); + + Ok(()) + } + + #[tokio::test] + async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> { + let tenant_conf = pageserver_api::models::TenantConfig { + pitr_interval: Some(Duration::from_secs(7 * 3600)), + image_layer_force_creation_period: Some(Duration::from_secs(3600)), + ..Default::default() + }; + + let tenant_id = TenantId::generate(); + + let harness = TenantHarness::create_custom( + "test_get_force_image_creation_lsn", + tenant_conf, + tenant_id, + ShardIdentity::unsharded(), + Generation::new(1), + ) + .await?; + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100)); + { + let writer = timeline.writer().await; + writer.finish_write(Lsn(5000)); + } + + let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap(); + assert_eq!(image_creation_lsn, Lsn(4300)); Ok(()) } } diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 23052ccee7..ba02602cfe 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -46,10 +46,11 @@ mod historic_layer_coverage; mod layer_coverage; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use anyhow::Result; use historic_layer_coverage::BufferedHistoricLayerCoverage; @@ -904,6 +905,103 @@ impl LayerMap { max_stacked_deltas } + /* BEGIN_HADRON */ + /** + * Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully. + * It works by first finding the latest image layers and store them into a map. Then for each delta layer, + * find all overlapping image layers in order to potentially increase the image LSN in case there are gaps + * (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase + * image LSN to 150 because there is no WAL record in between). + * Finally, the image consistent LSN is computed by taking the minimum of all image layers. + */ + pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn { + struct ImageLayerInfo { + // creation LSN of the image layer + image_lsn: Lsn, + // the current minimum LSN of newer delta layers with overlapping key ranges + min_delta_lsn: Lsn, + } + let started_at = Instant::now(); + + let min_l0_deltas_lsn = { + let l0_deltas = self.level0_deltas(); + l0_deltas + .iter() + .map(|layer| layer.get_lsn_range().start) + .min() + .unwrap_or(disk_consistent_lsn) + }; + let global_key_range = Key::MIN..Key::MAX; + + // step 1: collect all most recent image layers into a map + // map: end key to image_layer_info + let mut image_map: BTreeMap = BTreeMap::new(); + for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) { + let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0)); + image_map.insert( + img_range.end, + ImageLayerInfo { + image_lsn: img_lsn, + min_delta_lsn: min_l0_deltas_lsn, + }, + ); + } + + // step 2: go through all delta layers, and update the image layer info with overlapping + // key ranges + for layer in self.historic.iter() { + if !layer.is_delta { + continue; + } + let delta_key_range = layer.get_key_range(); + let delta_lsn_range = layer.get_lsn_range(); + for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) { + debug_assert!(img_end_key >= &delta_key_range.start); + if delta_lsn_range.end > img_info.image_lsn { + // the delta layer includes WAL records after the image + // it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3 + img_info.min_delta_lsn = + std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start); + } + if img_end_key >= &delta_key_range.end { + // we have fully processed all overlapping image layers + break; + } + } + } + + // step 3, go through all image layers and find the image consistent LSN + let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap(); + let mut prev_key = Key::MIN; + for (img_key, img_info) in image_map { + tracing::debug!( + "Image layer {:?}:{} has min delta lsn {}", + Range { + start: prev_key, + end: img_key, + }, + img_info.image_lsn, + img_info.min_delta_lsn, + ); + let image_lsn = std::cmp::max( + img_info.image_lsn, + img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)), + ); + img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn); + prev_key = img_key; + } + tracing::info!( + "computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.", + img_consistent_lsn, + disk_consistent_lsn, + started_at.elapsed().as_millis(), + self.historic.len() + ); + img_consistent_lsn + } + + /* END_HADRON */ + /// Return all L0 delta layers pub fn level0_deltas(&self) -> &Vec> { &self.l0_delta_layers @@ -1579,6 +1677,138 @@ mod tests { LayerVisibilityHint::Visible )); } + + /* BEGIN_HADRON */ + #[test] + fn test_compute_image_consistent_lsn() { + let mut layer_map = LayerMap::default(); + + let disk_consistent_lsn = Lsn(1000); + // case 1: empty layer map + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!( + disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(), + image_consistent_lsn + ); + + // case 2: only L0 delta layer + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(100), + Lsn(900)..Lsn(990), + true, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(100), + Lsn(850)..Lsn(899), + true, + )); + } + + // should use min L0 delta LSN - 1 as image consistent LSN + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(849), image_consistent_lsn); + + // case 3: 3 images, no L1 delta + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(40), + Lsn(100)..Lsn(100), + false, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(40)..Key::from_i128(70), + Lsn(200)..Lsn(200), + false, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(70)..Key::from_i128(100), + Lsn(150)..Lsn(150), + false, + )); + } + // should use min L0 delta LSN - 1 as image consistent LSN + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(849), image_consistent_lsn); + + // case 4: 3 images with 1 L1 delta + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(50), + Lsn(300)..Lsn(350), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(299), image_consistent_lsn); + + // case 5: 3 images with 1 more L1 delta with smaller LSN + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(50)..Key::from_i128(72), + Lsn(200)..Lsn(300), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 6: 3 images with more newer L1 deltas (no impact on final results) + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(30), + Lsn(400)..Lsn(500), + true, + )); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(35)..Key::from_i128(100), + Lsn(450)..Lsn(600), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 7: 3 images with more older L1 deltas (no impact on final results) + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(40), + Lsn(0)..Lsn(50), + true, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(50)..Key::from_i128(100), + Lsn(10)..Lsn(60), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 8: 3 images with one more L1 delta with overlapping LSN range + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(50), + Lsn(50)..Lsn(250), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(100), image_consistent_lsn); + } + + /* END_HADRON */ } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a9bc0a060b..718ea925b7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -351,13 +351,6 @@ pub struct Timeline { last_image_layer_creation_check_at: AtomicLsn, last_image_layer_creation_check_instant: std::sync::Mutex>, - // HADRON - /// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation - /// on this key range. - force_image_creation_lsn: AtomicLsn, - /// The last time instant when force_image_creation_lsn is computed. - force_image_creation_lsn_computed_at: std::sync::Mutex>, - /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, @@ -2854,7 +2847,7 @@ impl Timeline { } // HADRON - fn get_image_creation_timeout(&self) -> Option { + fn get_image_layer_force_creation_period(&self) -> Option { let tenant_conf = self.tenant_conf.load(); tenant_conf .tenant_conf @@ -3134,9 +3127,6 @@ impl Timeline { repartition_threshold: 0, last_image_layer_creation_check_at: AtomicLsn::new(0), last_image_layer_creation_check_instant: Mutex::new(None), - // HADRON - force_image_creation_lsn: AtomicLsn::new(0), - force_image_creation_lsn_computed_at: std::sync::Mutex::new(None), last_received_wal: Mutex::new(None), rel_size_latest_cache: RwLock::new(HashMap::new()), rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), @@ -5381,13 +5371,16 @@ impl Timeline { } // HADRON + // for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline + min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn()); if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 { info!( - "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}", + "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}", partition.ranges[0].start, partition.ranges[0].end, min_image_lsn, - force_image_creation_lsn.unwrap() + force_image_creation_lsn.unwrap(), + max_deltas ); return true; } @@ -7153,6 +7146,19 @@ impl Timeline { .unwrap() .clone() } + + /* BEGIN_HADRON */ + pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result { + let guard = self + .layers + .read(LayerManagerLockHolder::ComputeImageConsistentLsn) + .await; + let layer_map = guard.layer_map()?; + let disk_consistent_lsn = self.get_disk_consistent_lsn(); + + Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn)) + } + /* END_HADRON */ } impl Timeline { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 171f9d1284..aa1aa937b6 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -8,7 +8,7 @@ use std::cmp::min; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant}; use super::layer_manager::LayerManagerLockHolder; use super::{ @@ -34,7 +34,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; -use postgres_ffi::to_pg_timestamp; use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; @@ -47,7 +46,6 @@ use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; -use crate::pgdatadir_mapping::LsnForTimestamp; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::gc_block::GcBlock; @@ -1271,10 +1269,7 @@ impl Timeline { // Define partitioning schema if needed // HADRON - let force_image_creation_lsn = self - .get_or_compute_force_image_creation_lsn(cancel, ctx) - .await - .map_err(CompactionError::Other)?; + let force_image_creation_lsn = self.get_force_image_creation_lsn(); // 1. L0 Compact let l0_outcome = { @@ -1484,59 +1479,37 @@ impl Timeline { } /* BEGIN_HADRON */ - // Get the force image creation LSN. Compute it if the last computed LSN is too old. - async fn get_or_compute_force_image_creation_lsn( - self: &Arc, - cancel: &CancellationToken, - ctx: &RequestContext, - ) -> anyhow::Result> { - const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes - let image_layer_force_creation_period = self.get_image_creation_timeout(); - if image_layer_force_creation_period.is_none() { - return Ok(None); + // Get the force image creation LSN based on gc_cutoff_lsn. + // Note that this is an estimation and the workload rate may suddenly change. When that happens, + // the force image creation may be too early or too late, but eventually it should be able to catch up. + pub(crate) fn get_force_image_creation_lsn(self: &Arc) -> Option { + let image_creation_period = self.get_image_layer_force_creation_period()?; + let current_lsn = self.get_last_record_lsn(); + let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?; + let pitr_interval = self.get_pitr_interval(); + if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() { + tracing::warn!( + "pitr LSN/interval not found, skipping force image creation LSN calculation" + ); + return None; } - let image_layer_force_creation_period = image_layer_force_creation_period.unwrap(); - let force_image_creation_lsn_computed_at = - *self.force_image_creation_lsn_computed_at.lock().unwrap(); - if force_image_creation_lsn_computed_at.is_none() - || force_image_creation_lsn_computed_at.unwrap().elapsed() - > FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL - { - let now: SystemTime = SystemTime::now(); - let timestamp = now - .checked_sub(image_layer_force_creation_period) - .ok_or_else(|| { - anyhow::anyhow!( - "image creation timeout is too large: {image_layer_force_creation_period:?}" - ) - })?; - let timestamp = to_pg_timestamp(timestamp); - let force_image_creation_lsn = match self - .find_lsn_for_timestamp(timestamp, cancel, ctx) - .await? - { - LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn, - _ => { - let gc_lsn = *self.get_applied_gc_cutoff_lsn(); - tracing::info!( - "no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}", - gc_lsn - ); - gc_lsn - } - }; - self.force_image_creation_lsn - .store(force_image_creation_lsn); - *self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now()); - tracing::info!( - "computed force image creation LSN: {}", - force_image_creation_lsn - ); - Ok(Some(force_image_creation_lsn)) - } else { - Ok(Some(self.force_image_creation_lsn.load())) - } + let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0 + * image_creation_period.as_secs() + / pitr_interval.as_secs(); + let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0)); + + tracing::info!( + "Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}", + self.tenant_shard_id, + force_image_creation_lsn, + current_lsn, + image_creation_period, + pitr_lsn, + pitr_interval + ); + + Some(force_image_creation_lsn) } /* END_HADRON */ diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 2eccf48579..d8d81a6c91 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -47,6 +47,7 @@ pub(crate) enum LayerManagerLockHolder { ImportPgData, DetachAncestor, Eviction, + ComputeImageConsistentLsn, #[cfg(test)] Testing, } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e5a3a969d4..62fc212e12 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -850,6 +850,31 @@ async fn handle_tenant_describe( json_response(StatusCode::OK, service.tenant_describe(tenant_id)?) } +/* BEGIN_HADRON */ +async fn handle_tenant_timeline_describe( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Scrubber)?; + + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(_req) => {} + }; + + json_response( + StatusCode::OK, + service + .tenant_timeline_describe(tenant_id, timeline_id) + .await?, + ) +} +/* END_HADRON */ + async fn handle_tenant_list( service: Arc, req: Request, @@ -2480,6 +2505,13 @@ pub fn make_router( ) }) // Timeline operations + .get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { + tenant_service_handler( + r, + handle_tenant_timeline_describe, + RequestName("v1_tenant_timeline_describe"), + ) + }) .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { tenant_service_handler( r, diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index d6fe173eb3..da0687895a 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -86,6 +86,23 @@ impl PageserverClient { ) } + /* BEGIN_HADRON */ + pub(crate) async fn tenant_timeline_describe( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + ) -> Result { + measured_request!( + "tenant_timeline_describe", + crate::metrics::Method::Get, + &self.node_id_label, + self.inner + .tenant_timeline_describe(tenant_shard_id, timeline_id,) + .await + ) + } + /* END_HADRON */ + pub(crate) async fn tenant_scan_remote_storage( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9c1b81d261..31d149c5ac 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -32,7 +32,7 @@ use pageserver_api::controller_api::{ ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, - TenantShardMigrateRequest, TenantShardMigrateResponse, + TenantShardMigrateRequest, TenantShardMigrateResponse, TenantTimelineDescribeResponse, }; use pageserver_api::models::{ self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease, @@ -5486,6 +5486,92 @@ impl Service { .ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into())) } + /* BEGIN_HADRON */ + pub(crate) async fn tenant_timeline_describe( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result { + self.tenant_remote_mutation(tenant_id, |locations| async move { + if locations.0.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + }; + + let locations: Vec<(TenantShardId, Node)> = locations + .0 + .iter() + .map(|t| (*t.0, t.1.latest.node.clone())) + .collect(); + let mut futs = FuturesUnordered::new(); + + for (shard_id, node) in locations { + futs.push({ + async move { + let result = node + .with_client_retries( + |client| async move { + client + .tenant_timeline_describe(&shard_id, &timeline_id) + .await + }, + &self.http_client, + &self.config.pageserver_jwt_token, + 3, + 3, + Duration::from_secs(30), + &self.cancel, + ) + .await; + (result, shard_id, node.get_id()) + } + }); + } + + let mut results: Vec = Vec::new(); + while let Some((result, tenant_shard_id, node_id)) = futs.next().await { + match result { + Some(Ok(timeline_info)) => results.push(timeline_info), + Some(Err(e)) => { + tracing::warn!( + "Failed to describe tenant {} timeline {} for pageserver {}: {e}", + tenant_shard_id, + timeline_id, + node_id, + ); + return Err(ApiError::ResourceUnavailable(format!("{e}").into())); + } + None => return Err(ApiError::Cancelled), + } + } + let mut image_consistent_lsn: Option = Some(Lsn::MAX); + for timeline_info in &results { + if let Some(tline_image_consistent_lsn) = timeline_info.image_consistent_lsn { + image_consistent_lsn = Some(std::cmp::min( + image_consistent_lsn.unwrap(), + tline_image_consistent_lsn, + )); + } else { + tracing::warn!( + "Timeline {} on shard {} does not have image consistent lsn", + timeline_info.timeline_id, + timeline_info.tenant_id + ); + image_consistent_lsn = None; + break; + } + } + + Ok(TenantTimelineDescribeResponse { + shards: results, + image_consistent_lsn, + }) + }) + .await? + } + /* END_HADRON */ + /// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not /// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory /// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 42924f9b83..a7b7f0e74d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2342,6 +2342,20 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() return response.json() + # HADRON + def tenant_timeline_describe( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ): + response = self.request( + "GET", + f"{self.api}/control/v1/tenant/{tenant_id}/timeline/{timeline_id}", + headers=self.headers(TokenScope.ADMIN), + ) + response.raise_for_status() + return response.json() + def nodes(self): """ :return: list of {"id": ""} diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index e67161c6b7..ab02314288 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -960,9 +960,9 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): return image_layer_count, delta_layer_count -def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): +def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder): """ - Tests that page server can force creating new images if image creation timeout is enabled + Tests that page server can force creating new images if image_layer_force_creation_period is enabled """ # use large knobs to disable L0 compaction/image creation except for the force image creation tenant_conf = { @@ -972,10 +972,10 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): "checkpoint_distance": 10 * 1024, "checkpoint_timeout": "1s", "image_layer_force_creation_period": "1s", - # The lsn for forced image layer creations is calculated once every 10 minutes. - # Hence, drive compaction manually such that the test doesn't compute it at the - # wrong time. - "compaction_period": "0s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", } # consider every tenant large to run the image layer generation check more eagerly @@ -1018,4 +1018,69 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): ) +def test_image_consistent_lsn(neon_env_builder: NeonEnvBuilder): + """ + Test the /v1/tenant//timeline/ endpoint and the computation of image_consistent_lsn + """ + # use large knobs to disable L0 compaction/image creation except for the force image creation + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + "checkpoint_distance": 10 * 1024, + "checkpoint_timeout": "1s", + "image_layer_force_creation_period": "1s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", + } + + neon_env_builder.num_pageservers = 2 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start( + initial_tenant_conf=tenant_conf, + initial_tenant_shard_count=4, + initial_tenant_shard_stripe_size=1, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)") + for v in range(10): + endpoint.safe_psql( + f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False + ) + + response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id) + shards = response["shards"] + for shard in shards: + assert shard["image_consistent_lsn"] is not None + image_consistent_lsn = response["image_consistent_lsn"] + assert image_consistent_lsn is not None + + # do more writes and wait for image_consistent_lsn to advance + for v in range(100): + endpoint.safe_psql( + f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False + ) + + def check_image_consistent_lsn_advanced(): + response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id) + new_image_consistent_lsn = response["image_consistent_lsn"] + shards = response["shards"] + for shard in shards: + print(f"shard {shard['tenant_id']} image_consistent_lsn{shard['image_consistent_lsn']}") + assert new_image_consistent_lsn != image_consistent_lsn + + wait_until(check_image_consistent_lsn_advanced) + + endpoint.stop_and_destroy() + + for ps in env.pageservers: + ps.allowed_errors.append(".*created delta file of size.*larger than double of target.*") + + # END_HADRON From 154f6dc59cc91ebde58d1a0b4a8b43aa68d1c3a5 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 11 Jul 2025 14:25:25 +0100 Subject: [PATCH 03/14] pageserver: log only on final shard resolution failure (#12565) This log is too noisy. Instead of warning on every retry, let's log only on the final failure. --- pageserver/src/tenant/timeline/handle.rs | 10 +++++----- test_runner/fixtures/pageserver/allowed_errors.py | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 33c97287c0..7bca66190f 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -359,14 +359,14 @@ impl Cache { Err(e) => { // Retry on tenant manager error to handle tenant split more gracefully if attempt < GET_MAX_RETRIES { - tracing::warn!( - "Fail to resolve tenant shard in attempt {}: {:?}. Retrying...", - attempt, - e - ); tokio::time::sleep(RETRY_BACKOFF).await; continue; } else { + tracing::warn!( + "Failed to resolve tenant shard after {} attempts: {:?}", + GET_MAX_RETRIES, + e + ); return Err(e); } } diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 0e4dd571c0..59249f31ad 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -115,8 +115,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*Local data loss suspected.*", # Too many frozen layers error is normal during intensive benchmarks ".*too many frozen layers.*", - # Transient errors when resolving tenant shards by page service - ".*Fail to resolve tenant shard in attempt.*", + ".*Failed to resolve tenant shard after.*", # Expected warnings when pageserver has not refreshed GC info yet ".*pitr LSN/interval not found, skipping force image creation LSN calculation.*", ".*No broker updates received for a while.*", From a8db7ebffb7e9b2a1bc9cc950a03a244d26d34d4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 11 Jul 2025 17:17:44 +0300 Subject: [PATCH 04/14] Minor refactor of the SQL functions to get working set size estimate (#12550) Split the functions into two: one internal function to calculate the estimate, and another (two functions) to expose it as SQL functions. This is in preparation of adding new communicator implementation. With that, the SQL functions will dispatch the call to the old or new implementation depending on which is being used. --- pgxn/neon/file_cache.c | 47 +++++++++++++++--------------------------- pgxn/neon/file_cache.h | 3 ++- pgxn/neon/neon.c | 30 +++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8cfa09bc87..0e316abd1d 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -205,6 +205,8 @@ bool AmPrewarmWorker; #define LFC_ENABLED() (lfc_ctl->limit != 0) +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); + /* * Close LFC file if opened. * All backends should close their LFC files once LFC is disabled. @@ -2135,40 +2137,25 @@ local_cache_pages(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); -Datum -approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +/* + * Internal implementation of the approximate_working_set_size_seconds() + * function. + */ +int32 +lfc_approximate_working_set_size_seconds(time_t duration, bool reset) { - if (lfc_size_limit != 0) - { - int32 dc; - time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0); - LWLockAcquire(lfc_lock, LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); -} + int32 dc; -PG_FUNCTION_INFO_V1(approximate_working_set_size); + if (lfc_size_limit == 0) + return -1; -Datum -approximate_working_set_size(PG_FUNCTION_ARGS) -{ - if (lfc_size_limit != 0) - { - int32 dc; - bool reset = PG_GETARG_BOOL(0); - LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1); - if (reset) - memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); + LWLockAcquire(lfc_lock, LW_SHARED); + dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); + if (reset) + memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); + LWLockRelease(lfc_lock); + return dc; } PG_FUNCTION_INFO_V1(get_local_cache_state); diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index d5ac55d5ba..14e5d4f753 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -47,7 +47,8 @@ extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blk extern FileCacheState* lfc_get_state(size_t max_entries); extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); -PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); +extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset); + static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 9e0ca16fed..7b749f1080 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -561,6 +561,8 @@ _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); +PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); +PG_FUNCTION_INFO_V1(approximate_working_set_size); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -607,6 +609,34 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) PG_RETURN_UINT64(BackpressureThrottlingTime()); } +Datum +approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +{ + time_t duration; + int32 dc; + + duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0); + + dc = lfc_approximate_working_set_size_seconds(duration, false); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + +Datum +approximate_working_set_size(PG_FUNCTION_ARGS) +{ + bool reset = PG_GETARG_BOOL(0); + int32 dc; + + dc = lfc_approximate_working_set_size_seconds(-1, reset); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + #if PG_MAJORVERSION_NUM >= 16 static void neon_shmem_startup_hook(void) From f4245403b36925c3ad0ef39c344ca30b1701b74f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 11 Jul 2025 16:13:36 +0100 Subject: [PATCH 05/14] [proxy] allow testing query cancellation locally (#12568) ## Problem Canceelation requires redis, redis required control-plane. ## Summary of changes Make redis for cancellation not require control plane. Add instructions for setting up redis locally. --- proxy/README.md | 10 +++++++++- proxy/src/binary/proxy.rs | 20 +++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/proxy/README.md b/proxy/README.md index e10ff3d710..ff48f9f323 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -123,6 +123,11 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_pl docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';" ``` +If you want to test query cancellation, redis is also required: +```sh +docker run --detach --name proxy-redis --publish 6379:6379 redis:7.0 +``` + Let's create self-signed certificate by running: ```sh openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build" @@ -130,7 +135,10 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key Then we need to build proxy with 'testing' feature and run, e.g.: ```sh -RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key +RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- \ + --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' \ + --redis-auth-type="plain" --redis-plain="redis://127.0.0.1:6379" \ + -c server.crt -k server.key ``` Now from client you can start a new session: diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 691709ce2a..16a7dc7b67 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -522,15 +522,7 @@ pub async fn run() -> anyhow::Result<()> { maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); } - if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend - && let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api - && let Some(client) = redis_client - { - // project info cache and invalidation of that cache. - let cache = api.caches.project_info.clone(); - maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone())); - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - + if let Some(client) = redis_client { // Try to connect to Redis 3 times with 1 + (0..0.1) second interval. // This prevents immediate exit and pod restart, // which can cause hammering of the redis in case of connection issues. @@ -560,6 +552,16 @@ pub async fn run() -> anyhow::Result<()> { } } } + + #[allow(irrefutable_let_patterns)] + if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend + && let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api + { + // project info cache and invalidation of that cache. + let cache = api.caches.project_info.clone(); + maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); + maintenance_tasks.spawn(async move { cache.gc_worker().await }); + } } let maintenance = loop { From a0a7733b5aa657553a5b91bb0a3d4f6e3847e38b Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Fri, 11 Jul 2025 10:57:50 -0500 Subject: [PATCH 06/14] Use relative paths in submodule URL references (#12559) This is a nifty trick from the hadron repo that seems to help with SSH key dance. Signed-off-by: Tristan Partin --- .gitmodules | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitmodules b/.gitmodules index d1330bf28c..e381fb079e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,16 +1,16 @@ [submodule "vendor/postgres-v14"] path = vendor/postgres-v14 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_14_STABLE_neon [submodule "vendor/postgres-v15"] path = vendor/postgres-v15 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_15_STABLE_neon [submodule "vendor/postgres-v16"] path = vendor/postgres-v16 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_16_STABLE_neon [submodule "vendor/postgres-v17"] path = vendor/postgres-v17 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_17_STABLE_neon From 3300207523008ab3dd922780c4d164bd4376a007 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 11 Jul 2025 19:05:22 +0300 Subject: [PATCH 07/14] Update working set size estimate without lock (#12570) Update the WSS estimate before acquring the lock, so that we don't need to hold the lock for so long. That seems safe to me, see added comment. I was planning to do this with the new rust-based communicator implementation anyway, but it might help a little with the current C implementation too. And more importantly, having this as a separate PR gives us a chance to review this aspect independently. --- pgxn/neon/file_cache.c | 77 +++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 0e316abd1d..2c87f139af 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -162,8 +162,34 @@ typedef struct FileCacheControl dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ - HyperLogLogState wss_estimation; /* estimation of working set size */ + ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + + /* + * Estimation of working set size. + * + * This is not guarded by the lock. No locking is needed because all the + * writes to the "registers" are simple 64-bit stores, to update a + * timestamp. We assume that: + * + * - 64-bit stores are atomic. We could enforce that by using + * pg_atomic_uint64 instead of TimestampTz as the datatype in hll.h, but + * for now we just rely on it implicitly. + * + * - Even if they're not, and there is a race between two stores, it + * doesn't matter much which one wins because they're both updating the + * register with the current timestamp. Or you have a race between + * resetting the register and updating it, in which case it also doesn't + * matter much which one wins. + * + * - If they're not atomic, you might get an occasional "torn write" if + * you're really unlucky, but we tolerate that too. It just means that + * the estimate will be a little off, until the register is updated + * again. + */ + HyperLogLogState wss_estimation; + + /* Prewarmer state */ PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; size_t n_prewarm_workers; size_t n_prewarm_entries; @@ -1144,6 +1170,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + for (int i = 0; i < nblocks; i++) + { + tag.blockNum = blkno + i; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header @@ -1222,14 +1255,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - - /* Approximate working set for the blocks assumed in this entry */ - for (int i = 0; i < blocks_in_chunk; i++) - { - tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } - if (entry == NULL) { /* Pages are not cached */ @@ -1506,9 +1531,15 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, return false; CopyNRelFileInfoToBufTag(tag, rinfo); + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); tag.forkNum = forknum; - CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + if (lfc_prewarm_update_ws_estimation) + { + tag.blockNum = blkno; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); @@ -1526,19 +1557,13 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, if (lwlsn > lsn) { - elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", + elog(DEBUG1, "Skip LFC write for %u because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn)); LWLockRelease(lfc_lock); return false; } entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - - if (lfc_prewarm_update_ws_estimation) - { - tag.blockNum = blkno; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } if (found) { state = GET_STATE(entry, chunk_offs); @@ -1651,9 +1676,15 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return; CopyNRelFileInfoToBufTag(tag, rinfo); + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); tag.forkNum = forkNum; - CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + for (int i = 0; i < nblocks; i++) + { + tag.blockNum = blkno + i; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -1694,14 +1725,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, cv = &lfc_ctl->cv[hash % N_COND_VARS]; entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - - /* Approximate working set for the blocks assumed in this entry */ - for (int i = 0; i < blocks_in_chunk; i++) - { - tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } - if (found) { /* @@ -2150,11 +2173,9 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) if (lfc_size_limit == 0) return -1; - LWLockAcquire(lfc_lock, LW_SHARED); dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); if (reset) memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); return dc; } From 379259bdd75edae91fad0d180fa513bff3e1f92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 11 Jul 2025 19:07:14 +0200 Subject: [PATCH 08/14] storcon: don't error log on timeline delete if tenant migration is in progress (#12523) Fixes [LKB-61](https://databricks.atlassian.net/browse/LKB-61): `test_timeline_archival_chaos` being flaky with storcon error `Requested tenant is missing`. When a tenant migration is ongoing, and the attach request has been sent to the new location, but the attach hasn't finished yet, it is possible for the pageserver to return a 412 precondition failed HTTP error on timeline deletion, because it is being sent to the new location already. That one we would previously log via sth like: ``` ERROR request{method=DELETE path=/v1/tenant/1f544a11c90d1afd7af9b26e48985a4e/timeline/32818fb3ebf07cb7f06805429d7dee38 request_id=c493c04b-7f33-46d2-8a65-aac8a5516055}: Error processing HTTP request: InternalServerError(Error deleting timeline 32 818fb3ebf07cb7f06805429d7dee38 on 1f544a11c90d1afd7af9b26e48985a4e on node 2 (localhost): pageserver API: Precondition failed: Requested tenant is missing ``` This patch changes that and makes us return a more reasonable resource unavailable error. Not sure how scalable this is with tenants with a large number of shards, but that's a different discussion (we'd probably need a limited amount of per-storcon retries). example [link](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-12398/15981821532/index.html#/testresult/e7785dfb1238d92f). --- storage_controller/src/service.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 31d149c5ac..0907907edc 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5206,6 +5206,9 @@ impl Service { match res { Ok(ok) => Ok(ok), Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT), + Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) if msg.contains("Requested tenant is missing") => { + Err(ApiError::ResourceUnavailable("Tenant migration in progress".into())) + }, Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())), Err(e) => { Err( From 63ca084696f4dd226bfea1abae66dcb3234d1051 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Fri, 11 Jul 2025 14:37:55 -0400 Subject: [PATCH 09/14] fix(pageserver): downgrade wal apply error during gc-compaction (#12518) ## Problem close LKB-162 close https://github.com/neondatabase/cloud/issues/30665, related to https://github.com/neondatabase/cloud/issues/29434 We see a lot of errors like: ``` 2025-05-22T23:06:14.928959Z ERROR compaction_loop{tenant_id=? shard_id=0304}:run:gc_compact_timeline{timeline_id=?}: error applying 4 WAL records 35/DC0DF0B8..3B/E43188C0 (8119 bytes) to key 000000067F0000400500006027000000B9D0, from base image with LSN 0/0 to reconstruct page image at LSN 61/150B9B20 n_attempts=0: apply_wal_records Caused by: 0: read walredo stdout 1: early eof ``` which is an acceptable form of error and we should downgrade it to warning. ## Summary of changes walredo error during gc-compaction is expected when the data below the gc horizon does not contain a full key history. This is possible in some rare cases of gc that is only able to remove data in the middle of the history but not all earlier history when a full keyspace gets deleted. Signed-off-by: Alex Chi Z --- pageserver/src/walredo.rs | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c6d3cafe9a..f053c9ed37 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -147,6 +147,16 @@ pub enum RedoAttemptType { GcCompaction, } +impl std::fmt::Display for RedoAttemptType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RedoAttemptType::ReadPage => write!(f, "read page"), + RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"), + RedoAttemptType::GcCompaction => write!(f, "gc compaction"), + } + } +} + /// /// Public interface of WAL redo manager /// @@ -199,6 +209,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, max_retry_attempts, + redo_attempt_type, ) .await }; @@ -221,6 +232,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, max_retry_attempts, + redo_attempt_type, ) .await } @@ -445,6 +457,7 @@ impl PostgresRedoManager { wal_redo_timeout: Duration, pg_version: PgMajorVersion, max_retry_attempts: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); @@ -485,17 +498,28 @@ impl PostgresRedoManager { ); if let Err(e) = result.as_ref() { - error!( - "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", - records.len(), - records.first().map(|p| p.0).unwrap_or(Lsn(0)), - records.last().map(|p| p.0).unwrap_or(Lsn(0)), - nbytes, - base_img_lsn, - lsn, - n_attempts, - e, - ); + macro_rules! message { + ($level:tt) => { + $level!( + "error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", + records.len(), + records.first().map(|p| p.0).unwrap_or(Lsn(0)), + records.last().map(|p| p.0).unwrap_or(Lsn(0)), + nbytes, + key, + redo_attempt_type, + base_img_lsn, + lsn, + n_attempts, + e, + ) + } + } + match redo_attempt_type { + RedoAttemptType::ReadPage => message!(error), + RedoAttemptType::LegacyCompaction => message!(error), + RedoAttemptType::GcCompaction => message!(warn), + } } result.map_err(Error::Other) From 4566b12a22876f1110b77da9e7b75615c9963b38 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 11 Jul 2025 20:56:39 +0200 Subject: [PATCH 10/14] NEON: Finish Zenith->Neon rename (#12566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Even though we're now part of Databricks, let's at least make this part consistent. ## Summary of changes - PG14: https://github.com/neondatabase/postgres/pull/669 - PG15: https://github.com/neondatabase/postgres/pull/670 - PG16: https://github.com/neondatabase/postgres/pull/671 - PG17: https://github.com/neondatabase/postgres/pull/672 --------- Co-authored-by: Arpad Müller --- compute_tools/src/compute.rs | 23 +++++++++++++ control_plane/src/endpoint.rs | 3 +- docs/core_changes.md | 7 ++-- pageserver/src/basebackup.rs | 33 +++++++++++-------- pageserver/src/import_datadir.rs | 14 ++++---- pgxn/neon_test_utils/neontest.c | 10 +++--- pgxn/typedefs.list | 22 ++++++------- test_runner/fixtures/neon_fixtures.py | 1 + .../regress/test_timeline_detach_ancestor.py | 8 ++--- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- vendor/revisions.json | 8 ++--- 14 files changed, 84 insertions(+), 53 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c05cc229a2..2e0b7d7b2e 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1040,6 +1040,8 @@ impl ComputeNode { PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?, }; + self.fix_zenith_signal_neon_signal()?; + let mut state = self.state.lock().unwrap(); state.metrics.pageserver_connect_micros = connected.duration_since(started).as_micros() as u64; @@ -1049,6 +1051,27 @@ impl ComputeNode { Ok(()) } + /// Move the Zenith signal file to Neon signal file location. + /// This makes Compute compatible with older PageServers that don't yet + /// know about the Zenith->Neon rename. + fn fix_zenith_signal_neon_signal(&self) -> Result<()> { + let datadir = Path::new(&self.params.pgdata); + + let neonsig = datadir.join("neon.signal"); + + if neonsig.is_file() { + return Ok(()); + } + + let zenithsig = datadir.join("zenith.signal"); + + if zenithsig.is_file() { + fs::copy(zenithsig, neonsig)?; + } + + Ok(()) + } + /// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when /// the connection was established, and the (compressed) size of the basebackup. fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> { diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index ad2067e0f2..91a62b0ca4 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -32,7 +32,8 @@ //! config.json - passed to `compute_ctl` //! pgdata/ //! postgresql.conf - copy of postgresql.conf created by `compute_ctl` -//! zenith.signal +//! neon.signal +//! zenith.signal - copy of neon.signal, for backward compatibility //! //! ``` //! diff --git a/docs/core_changes.md b/docs/core_changes.md index 1388317728..abfd20af26 100644 --- a/docs/core_changes.md +++ b/docs/core_changes.md @@ -129,9 +129,10 @@ segment to bootstrap the WAL writing, but it doesn't contain the checkpoint reco changes in xlog.c, to allow starting the compute node without reading the last checkpoint record from WAL. -This includes code to read the `zenith.signal` file, which tells the startup code the LSN to start -at. When the `zenith.signal` file is present, the startup uses that LSN instead of the last -checkpoint's LSN. The system is known to be consistent at that LSN, without any WAL redo. +This includes code to read the `neon.signal` (also `zenith.signal`) file, which tells the startup +code the LSN to start at. When the `neon.signal` file is present, the startup uses that LSN +instead of the last checkpoint's LSN. The system is known to be consistent at that LSN, without +any WAL redo. ### How to get rid of the patch diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 36dada1e89..1a44c80e2d 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -114,7 +114,7 @@ where // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the - // "zenith.signal" file, so that postgres can read it during startup. + // "neon.signal" file, so that postgres can read it during startup. // // We don't keep full history of record boundaries in the page server, // however, only the predecessor of the latest record on each @@ -751,34 +751,39 @@ where // // Add generated pg_control file and bootstrap WAL segment. - // Also send zenith.signal file with extra bootstrap data. + // Also send neon.signal and zenith.signal file with extra bootstrap data. // async fn add_pgcontrol_file( &mut self, pg_control_bytes: Bytes, system_identifier: u64, ) -> Result<(), BasebackupError> { - // add zenith.signal file - let mut zenith_signal = String::new(); + // add neon.signal file + let mut neon_signal = String::new(); if self.prev_record_lsn == Lsn(0) { if self.timeline.is_ancestor_lsn(self.lsn) { - write!(zenith_signal, "PREV LSN: none") + write!(neon_signal, "PREV LSN: none") .map_err(|e| BasebackupError::Server(e.into()))?; } else { - write!(zenith_signal, "PREV LSN: invalid") + write!(neon_signal, "PREV LSN: invalid") .map_err(|e| BasebackupError::Server(e.into()))?; } } else { - write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn) + write!(neon_signal, "PREV LSN: {}", self.prev_record_lsn) .map_err(|e| BasebackupError::Server(e.into()))?; } - self.ar - .append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - ) - .await - .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?; + + // TODO: Remove zenith.signal once all historical computes have been replaced + // ... and thus support the neon.signal file. + for signalfilename in ["neon.signal", "zenith.signal"] { + self.ar + .append( + &new_tar_header(signalfilename, neon_signal.len() as u64)?, + neon_signal.as_bytes(), + ) + .await + .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,neon.signal"))?; + } //send pg_control let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 96fe0c1078..409cc2e3c5 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -610,13 +610,13 @@ async fn import_file( debug!("imported twophase file"); } else if file_path.starts_with("pg_wal") { debug!("found wal file in base section. ignore it"); - } else if file_path.starts_with("zenith.signal") { + } else if file_path.starts_with("zenith.signal") || file_path.starts_with("neon.signal") { // Parse zenith signal file to set correct previous LSN let bytes = read_all_bytes(reader).await?; - // zenith.signal format is "PREV LSN: prev_lsn" + // neon.signal format is "PREV LSN: prev_lsn" // TODO write serialization and deserialization in the same place. - let zenith_signal = std::str::from_utf8(&bytes)?.trim(); - let prev_lsn = match zenith_signal { + let neon_signal = std::str::from_utf8(&bytes)?.trim(); + let prev_lsn = match neon_signal { "PREV LSN: none" => Lsn(0), "PREV LSN: invalid" => Lsn(0), other => { @@ -624,17 +624,17 @@ async fn import_file( split[1] .trim() .parse::() - .context("can't parse zenith.signal")? + .context("can't parse neon.signal")? } }; - // zenith.signal is not necessarily the last file, that we handle + // neon.signal is not necessarily the last file, that we handle // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. let writer = modification.tline.writer().await; writer.finish_write(prev_lsn); - debug!("imported zenith signal {}", prev_lsn); + debug!("imported neon signal {}", prev_lsn); } else if file_path.starts_with("pg_tblspc") { // TODO Backups exported from neon won't have pg_tblspc, but we will need // this to import arbitrary postgres databases. diff --git a/pgxn/neon_test_utils/neontest.c b/pgxn/neon_test_utils/neontest.c index d37412f674..5f880dfd23 100644 --- a/pgxn/neon_test_utils/neontest.c +++ b/pgxn/neon_test_utils/neontest.c @@ -236,13 +236,13 @@ clear_buffer_cache(PG_FUNCTION_ARGS) bool save_neon_test_evict; /* - * Temporarily set the zenith_test_evict GUC, so that when we pin and + * Temporarily set the neon_test_evict GUC, so that when we pin and * unpin a buffer, the buffer is evicted. We use that hack to evict all * buffers, as there is no explicit "evict this buffer" function in the * buffer manager. */ - save_neon_test_evict = zenith_test_evict; - zenith_test_evict = true; + save_neon_test_evict = neon_test_evict; + neon_test_evict = true; PG_TRY(); { /* Scan through all the buffers */ @@ -273,7 +273,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS) /* * Pin the buffer, and release it again. Because we have - * zenith_test_evict==true, this will evict the page from the + * neon_test_evict==true, this will evict the page from the * buffer cache if no one else is holding a pin on it. */ if (isvalid) @@ -286,7 +286,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS) PG_FINALLY(); { /* restore the GUC */ - zenith_test_evict = save_neon_test_evict; + neon_test_evict = save_neon_test_evict; } PG_END_TRY(); diff --git a/pgxn/typedefs.list b/pgxn/typedefs.list index 760f384212..3ea8b3b091 100644 --- a/pgxn/typedefs.list +++ b/pgxn/typedefs.list @@ -2953,17 +2953,17 @@ XmlTableBuilderData YYLTYPE YYSTYPE YY_BUFFER_STATE -ZenithErrorResponse -ZenithExistsRequest -ZenithExistsResponse -ZenithGetPageRequest -ZenithGetPageResponse -ZenithMessage -ZenithMessageTag -ZenithNblocksRequest -ZenithNblocksResponse -ZenithRequest -ZenithResponse +NeonErrorResponse +NeonExistsRequest +NeonExistsResponse +NeonGetPageRequest +NeonGetPageResponse +NeonMessage +NeonMessageTag +NeonNblocksRequest +NeonNblocksResponse +NeonRequest +NeonResponse _SPI_connection _SPI_plan __AssignProcessToJobObject diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a7b7f0e74d..b9fff05c6c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5409,6 +5409,7 @@ SKIP_FILES = frozenset( ( "pg_internal.init", "pg.log", + "neon.signal", "zenith.signal", "pg_hba.conf", "postgresql.conf", diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index c0f163db32..45b7af719e 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -209,9 +209,9 @@ def test_ancestor_detach_branched_from( client.timeline_delete(env.initial_tenant, env.initial_timeline) wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline) - # because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different - # as there is always "PREV_LSN: invalid" for "before" - skip_files = {"zenith.signal"} + # because we do the fullbackup from ancestor at the branch_lsn, the neon.signal and/or zenith.signal is always + # different as there is always "PREV_LSN: invalid" for "before" + skip_files = {"zenith.signal", "neon.signal"} assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files) @@ -767,7 +767,7 @@ def test_compaction_induced_by_detaches_in_history( env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after ) - # we don't need to skip any files, because zenith.signal will be identical + # we don't need to skip any files, because neon.signal will be identical assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set()) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 9085654ee8..8ce1f52303 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 9085654ee8022d5cc4ca719380a1dc53e5e3246f +Subproject commit 8ce1f52303aec29e098309347b57c01a1962e221 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 8c3249f36c..afd46987f3 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9 +Subproject commit afd46987f3da50c9146a8aa59380052df0862c06 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 7a4c0eacae..e08c8d5f15 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 7a4c0eacaeb9b97416542fa19103061c166460b1 +Subproject commit e08c8d5f1576ca0487d14d154510499c5f12adfb diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index db424d42d7..353c725b0c 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit db424d42d748f8ad91ac00e28db2c7f2efa42f7f +Subproject commit 353c725b0c76cc82b15af21d8360d03391dc6814 diff --git a/vendor/revisions.json b/vendor/revisions.json index b260698c86..992aa405b1 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.5", - "db424d42d748f8ad91ac00e28db2c7f2efa42f7f" + "353c725b0c76cc82b15af21d8360d03391dc6814" ], "v16": [ "16.9", - "7a4c0eacaeb9b97416542fa19103061c166460b1" + "e08c8d5f1576ca0487d14d154510499c5f12adfb" ], "v15": [ "15.13", - "8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9" + "afd46987f3da50c9146a8aa59380052df0862c06" ], "v14": [ "14.18", - "9085654ee8022d5cc4ca719380a1dc53e5e3246f" + "8ce1f52303aec29e098309347b57c01a1962e221" ] } From cb991fba421999e390c9debfc39fb39a636fe1e9 Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Fri, 11 Jul 2025 12:27:55 -0700 Subject: [PATCH 11/14] A few more PS changes (#12552) # TLDR Problem-I is a bug fix. The rest are no-ops. ## Problem I Page server checks image layer creation based on the elapsed time but this check depends on the current logical size, which is only computed on shard 0. Thus, for non-0 shards, the check will be ineffective and image creation will never be done for idle tenants. ## Summary of changes I This PR fixes the problem by simply removing the dependency on current logical size. ## Summary of changes II This PR adds a timeout when calling page server to split shard to make sure SC does not wait for the API call forever. Currently the PR doesn't adds any retry logic because it's not clear whether page server shard split can be safely retried if the existing operation is still ongoing or left the storage in a bad state. Thus it's better to abort the whole operation and restart. ## Problem III `test_remote_failures` requires PS to be compiled in the testing mode. For PS in dev/staging, they are compiled without this mode. ## Summary of changes III Remove the restriction and also increase the number of total failures allowed. ## Summary of changes IV remove test on PS getpage http route. --------- Co-authored-by: Chen Luo Co-authored-by: Yecheng Yang Co-authored-by: Vlad Lazar --- control_plane/src/local_env.rs | 4 + control_plane/src/storage_controller.rs | 7 ++ libs/remote_storage/src/simulate_failures.rs | 1 + libs/utils/src/env.rs | 3 +- pageserver/src/bin/pageserver.rs | 5 -- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 + pageserver/src/tenant/timeline.rs | 48 ++++++----- storage_controller/src/main.rs | 7 ++ storage_controller/src/service.rs | 27 ++++++- test_runner/regress/test_compaction.py | 62 ++++++++++++++ test_runner/regress/test_sharding.py | 85 ++++++++++++++++++++ 12 files changed, 226 insertions(+), 27 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index d0611113e8..d34dd39f61 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -217,6 +217,9 @@ pub struct NeonStorageControllerConf { pub posthog_config: Option, pub kick_secondary_downloads: Option, + + #[serde(with = "humantime_serde")] + pub shard_split_request_timeout: Option, } impl NeonStorageControllerConf { @@ -250,6 +253,7 @@ impl Default for NeonStorageControllerConf { timeline_safekeeper_count: None, posthog_config: None, kick_secondary_downloads: None, + shard_split_request_timeout: None, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index dc6c82f504..f996f39967 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -648,6 +648,13 @@ impl StorageController { args.push(format!("--timeline-safekeeper-count={sk_cnt}")); } + if let Some(duration) = self.config.shard_split_request_timeout { + args.push(format!( + "--shard-split-request-timeout={}", + humantime::Duration::from(duration) + )); + } + let mut envs = vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 30d116f57c..e895380192 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -31,6 +31,7 @@ pub struct UnreliableWrapper { /* BEGIN_HADRON */ // This the probability of failure for each operation, ranged from [0, 100]. // The probability is default to 100, which means that all operations will fail. + // Storage will fail by probability up to attempts_to_fail times. attempt_failure_probability: u64, /* END_HADRON */ } diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index cc1cbf8009..0b3b5e6c4f 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -47,6 +47,7 @@ where /* BEGIN_HADRON */ pub enum DeploymentMode { + Local, Dev, Staging, Prod, @@ -64,7 +65,7 @@ pub fn get_deployment_mode() -> Option { } }, Err(_) => { - tracing::error!("DEPLOYMENT_MODE not set"); + // tracing::error!("DEPLOYMENT_MODE not set"); None } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 299fe7e159..dfb8b437c3 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -917,11 +917,6 @@ async fn create_remote_storage_client( // If `test_remote_failures` is non-zero, wrap the client with a // wrapper that simulates failures. if conf.test_remote_failures > 0 { - if !cfg!(feature = "testing") { - anyhow::bail!( - "test_remote_failures option is not available because pageserver was compiled without the 'testing' feature" - ); - } info!( "Simulating remote failures for first {} attempts of each op", conf.test_remote_failures diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d839bac557..0d40c5ecf7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4183,7 +4183,7 @@ pub fn make_router( }) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage", - |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), + |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), ) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage", diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 15853d3614..52f67abde5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1678,6 +1678,8 @@ impl TenantManager { // Phase 6: Release the InProgress on the parent shard drop(parent_slot_guard); + utils::pausable_failpoint!("shard-split-post-finish-pause"); + Ok(child_shards) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 718ea925b7..fe622713e9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5604,10 +5604,11 @@ impl Timeline { /// Predicate function which indicates whether we should check if new image layers /// are required. Since checking if new image layers are required is expensive in /// terms of CPU, we only do it in the following cases: - /// 1. If the timeline has ingested sufficient WAL to justify the cost + /// 1. If the timeline has ingested sufficient WAL to justify the cost or ... /// 2. If enough time has passed since the last check: /// 1. For large tenants, we wish to perform the check more often since they - /// suffer from the lack of image layers + /// suffer from the lack of image layers. Note that we assume sharded tenants + /// to be large since non-zero shards do not track the logical size. /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; @@ -5621,30 +5622,39 @@ impl Timeline { let distance_based_decision = distance.0 >= min_distance; - let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); - if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = - if Some(Into::::into(&logical_size)) >= large_timeline_threshold { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; - - time_based_decision = match *last_check_instant { - Some(last_check) => { - let elapsed = last_check.elapsed(); - elapsed >= check_required_after + let check_required_after = (|| { + if self.shard_identity.is_unsharded() { + if let CurrentLogicalSize::Exact(logical_size) = + self.current_logical_size.current_size() + { + if Some(Into::::into(&logical_size)) < large_timeline_threshold { + return Duration::from_secs(3600 * 48); + } } - None => true, - }; - } + } + + self.get_checkpoint_timeout() + })(); + + let time_based_decision = match *last_check_instant { + Some(last_check) => { + let elapsed = last_check.elapsed(); + elapsed >= check_required_after + } + None => true, + }; // Do the expensive delta layer counting only if this timeline has ingested sufficient // WAL since the last check or a checkpoint timeout interval has elapsed since the last // check. let decision = distance_based_decision || time_based_decision; - + tracing::info!( + "Decided to check image layers: {}. Distance-based decision: {}, time-based decision: {}", + decision, + distance_based_decision, + time_based_decision + ); if decision { self.last_image_layer_creation_check_at.store(lsn); *last_check_instant = Some(Instant::now()); diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 2a851dc25b..5d21feeb10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -222,6 +222,9 @@ struct Cli { /// Primarily useful for testing to reduce test execution time. #[arg(long, default_value = "false", action=ArgAction::Set)] kick_secondary_downloads: bool, + + #[arg(long)] + shard_split_request_timeout: Option, } enum StrictMode { @@ -470,6 +473,10 @@ async fn async_main() -> anyhow::Result<()> { timeline_safekeeper_count: args.timeline_safekeeper_count, posthog_config: posthog_config.clone(), kick_secondary_downloads: args.kick_secondary_downloads, + shard_split_request_timeout: args + .shard_split_request_timeout + .map(humantime::Duration::into) + .unwrap_or(Duration::MAX), }; // Validate that we can connect to the database diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0907907edc..638cb410fa 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -60,6 +60,7 @@ use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use utils::completion::Barrier; +use utils::env; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -483,6 +484,9 @@ pub struct Config { /// When set, actively checks and initiates heatmap downloads/uploads. pub kick_secondary_downloads: bool, + + /// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None. + pub shard_split_request_timeout: Duration, } impl From for ApiError { @@ -6406,18 +6410,39 @@ impl Service { // TODO: issue split calls concurrently (this only matters once we're splitting // N>1 shards into M shards -- initially we're usually splitting 1 shard into N). + // HADRON: set a timeout for splitting individual shards on page servers. + // Currently we do not perform any retry because it's not clear if page server can handle + // partially split shards correctly. + let shard_split_timeout = + if let Some(env::DeploymentMode::Local) = env::get_deployment_mode() { + Duration::from_secs(30) + } else { + self.config.shard_split_request_timeout + }; + let mut http_client_builder = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(0) + .timeout(shard_split_timeout); + + for ssl_ca_cert in &self.config.ssl_ca_certs { + http_client_builder = http_client_builder.add_root_certificate(ssl_ca_cert.clone()); + } + let http_client = http_client_builder + .build() + .expect("Failed to construct HTTP client"); for target in &targets { let ShardSplitTarget { parent_id, node, child_ids, } = target; + let client = PageserverClient::new( node.get_id(), - self.http_client.clone(), + http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), ); + let response = client .tenant_shard_split( *parent_id, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index ab02314288..963a19d640 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -7,6 +7,7 @@ import time from enum import StrEnum import pytest +from fixtures.common_types import TenantShardId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -960,6 +961,67 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): return image_layer_count, delta_layer_count +def test_image_layer_creation_time_threshold(neon_env_builder: NeonEnvBuilder): + """ + Tests that image layers can be created when the time threshold is reached on non-0 shards. + """ + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + # disable distance based image layer creation check + "checkpoint_distance": 10 * 1024 * 1024 * 1024, + "checkpoint_timeout": "100ms", + "image_layer_force_creation_period": "1s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start( + initial_tenant_conf=tenant_conf, + initial_tenant_shard_count=2, + initial_tenant_shard_stripe_size=1, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)") + + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + tenant_shard_id = TenantShardId(tenant_id, 1, 2) + + # Generate some rows. + for v in range(20): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # restart page server so that logical size on non-0 shards is missing + env.pageserver.restart() + + (old_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"old images: {old_images}, old deltas: {old_deltas}") + + def check_image_creation(): + (new_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"images: {new_images}, deltas: {old_deltas}") + assert new_images > old_images + + wait_until(check_image_creation) + + endpoint.stop_and_destroy() + + def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder): """ Tests that page server can force creating new images if image_layer_force_creation_period is enabled diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 8ff767eca4..5549105188 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1673,6 +1673,91 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder): # END_HADRON +# HADRON +@pytest.mark.skip(reason="The backpressure change has not been merged yet.") +def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): + """ + Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level. + """ + init_shard_count = 4 + neon_env_builder.num_pageservers = init_shard_count + stripe_size = 1 + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + initial_tenant_conf={ + # disable auto-flush of shards and set max_replication_flush_lag as 15MB. + # The backpressure parameters must be enforced at the shard level to avoid stalling PG. + "checkpoint_distance": 1 * 1024 * 1024 * 1024, + "checkpoint_timeout": "1h", + }, + ) + + endpoint = env.endpoints.create( + "main", + config_lines=[ + "max_replication_write_lag = 0", + "max_replication_apply_lag = 0", + "max_replication_flush_lag = 15MB", + "neon.max_cluster_size = 10GB", + ], + ) + endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. + endpoint.start() + + # generate 20MB of data + endpoint.safe_psql( + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" + ) + res = endpoint.safe_psql( + "SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system" + )[0] + assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}" + + endpoint.stop() + + +# HADRON +def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that shard split can correctly handle page server timeouts and abort the split + """ + init_shard_count = 2 + neon_env_builder.num_pageservers = 1 + stripe_size = 1 + + if neon_env_builder.storage_controller_config is None: + neon_env_builder.storage_controller_config = {"shard_split_request_timeout": "5s"} + else: + neon_env_builder.storage_controller_config["shard_split_request_timeout"] = "5s" + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + ) + + env.storage_controller.allowed_errors.extend( + [ + ".*Enqueuing background abort.*", + ".*failpoint.*", + ".*Failed to abort.*", + ".*Exclusive lock by ShardSplit was held.*", + ] + ) + env.pageserver.allowed_errors.extend([".*request was dropped before completing.*"]) + + endpoint1 = env.endpoints.create_start(branch_name="main") + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "pause")) + + with pytest.raises(StorageControllerApiException): + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4) + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "off")) + endpoint1.stop_and_destroy() + + def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder): """ Check a scenario when one of the shards is much slower than others. From 380d167b7ca2c8312fafffef30ae8cbdea7fd8a0 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 11 Jul 2025 21:35:42 +0200 Subject: [PATCH 12/14] proxy: For cancellation data replace HSET+EXPIRE/HGET with SET..EX/GET (#12553) ## Problem To store cancellation data we send two commands to redis because the redis server version doesn't support HSET with EX. Also, HSET is not really needed. ## Summary of changes * Replace the HSET + EXPIRE command pair with one SET .. EX command. * Replace HGET with GET. * Leave a workaround for old keys set with HSET. * Replace some anyhow errors with specific errors to surface the WRONGTYPE error from redis. --- Cargo.lock | 1 + proxy/Cargo.toml | 3 +- proxy/src/batch.rs | 68 +++++++---- proxy/src/cancellation.rs | 111 ++++++++++++------ proxy/src/metrics.rs | 6 +- .../connection_with_credentials_provider.rs | 24 +++- proxy/src/redis/elasticache.rs | 20 +++- proxy/src/redis/kv_ops.rs | 16 ++- 8 files changed, 175 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 025f4e4116..4323254f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5289,6 +5289,7 @@ dependencies = [ "async-trait", "atomic-take", "aws-config", + "aws-credential-types", "aws-sdk-iam", "aws-sigv4", "base64 0.22.1", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index ce8610be24..0a406d1ca8 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -16,6 +16,7 @@ async-compression.workspace = true async-trait.workspace = true atomic-take.workspace = true aws-config.workspace = true +aws-credential-types.workspace = true aws-sdk-iam.workspace = true aws-sigv4.workspace = true base64.workspace = true @@ -127,4 +128,4 @@ rstest.workspace = true walkdir.workspace = true rand_distr = "0.4" tokio-postgres.workspace = true -tracing-test = "0.2" \ No newline at end of file +tracing-test = "0.2" diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs index 33e08797f2..cf866ab9a3 100644 --- a/proxy/src/batch.rs +++ b/proxy/src/batch.rs @@ -7,13 +7,17 @@ use std::pin::pin; use std::sync::Mutex; use scopeguard::ScopeGuard; +use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use crate::ext::LockExt; +type ProcResult

= Result<

::Res,

::Err>; + pub trait QueueProcessing: Send + 'static { type Req: Send + 'static; type Res: Send; + type Err: Send + Clone; /// Get the desired batch size. fn batch_size(&self, queue_size: usize) -> usize; @@ -24,7 +28,18 @@ pub trait QueueProcessing: Send + 'static { /// If this apply can error, it's expected that errors be forwarded to each Self::Res. /// /// Batching does not need to happen atomically. - fn apply(&mut self, req: Vec) -> impl Future> + Send; + fn apply( + &mut self, + req: Vec, + ) -> impl Future, Self::Err>> + Send; +} + +#[derive(thiserror::Error)] +pub enum BatchQueueError { + #[error(transparent)] + Result(E), + #[error(transparent)] + Cancelled(C), } pub struct BatchQueue { @@ -34,7 +49,7 @@ pub struct BatchQueue { struct BatchJob { req: P::Req, - res: tokio::sync::oneshot::Sender, + res: tokio::sync::oneshot::Sender>, } impl BatchQueue

{ @@ -55,11 +70,11 @@ impl BatchQueue

{ &self, req: P::Req, cancelled: impl Future, - ) -> Result { + ) -> Result> { let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); let mut cancelled = pin!(cancelled); - let resp = loop { + let resp: Option> = loop { // try become the leader, or try wait for success. let mut processor = tokio::select! { // try become leader. @@ -72,7 +87,7 @@ impl BatchQueue

{ if inner.queue.remove(&id).is_some() { tracing::warn!("batched task cancelled before completion"); } - return Err(cancel); + return Err(BatchQueueError::Cancelled(cancel)); }, }; @@ -96,18 +111,30 @@ impl BatchQueue

{ // good: we didn't get cancelled. ScopeGuard::into_inner(cancel_safety); - if values.len() != resps.len() { - tracing::error!( - "batch: invalid response size, expected={}, got={}", - resps.len(), - values.len() - ); - } + match values { + Ok(values) => { + if values.len() != resps.len() { + tracing::error!( + "batch: invalid response size, expected={}, got={}", + resps.len(), + values.len() + ); + } - // send response values. - for (tx, value) in std::iter::zip(resps, values) { - if tx.send(value).is_err() { - // receiver hung up but that's fine. + // send response values. + for (tx, value) in std::iter::zip(resps, values) { + if tx.send(Ok(value)).is_err() { + // receiver hung up but that's fine. + } + } + } + + Err(err) => { + for tx in resps { + if tx.send(Err(err.clone())).is_err() { + // receiver hung up but that's fine. + } + } } } @@ -129,7 +156,8 @@ impl BatchQueue

{ tracing::debug!(id, "batch: job completed"); - Ok(resp.expect("no response found. batch processer should not panic")) + resp.expect("no response found. batch processer should not panic") + .map_err(BatchQueueError::Result) } } @@ -139,8 +167,8 @@ struct BatchQueueInner { } impl BatchQueueInner

{ - fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { - let (tx, rx) = tokio::sync::oneshot::channel(); + fn register_job(&mut self, req: P::Req) -> (u64, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); let id = self.version; @@ -158,7 +186,7 @@ impl BatchQueueInner

{ (id, rx) } - fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { + fn get_batch(&mut self, p: &P) -> (Vec, Vec>>) { let batch_size = p.batch_size(self.queue.len()); let mut reqs = Vec::with_capacity(batch_size); let mut resps = Vec::with_capacity(batch_size); diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 74413f1a7d..4ea4c4ea54 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -4,12 +4,11 @@ use std::pin::pin; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use anyhow::anyhow; use futures::FutureExt; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use postgres_client::RawCancelToken; use postgres_client::tls::MakeTlsConnect; -use redis::{Cmd, FromRedisValue, Value}; +use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; @@ -18,7 +17,7 @@ use tracing::{debug, error, info}; use crate::auth::AuthError; use crate::auth::backend::ComputeUserInfo; -use crate::batch::{BatchQueue, QueueProcessing}; +use crate::batch::{BatchQueue, BatchQueueError, QueueProcessing}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; @@ -28,7 +27,7 @@ use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, Redis use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; -use crate::redis::kv_ops::RedisKVClient; +use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError}; type IpSubnetKey = IpNet; @@ -45,6 +44,17 @@ pub enum CancelKeyOp { GetCancelData { key: CancelKeyData, }, + GetCancelDataOld { + key: CancelKeyData, + }, +} + +#[derive(thiserror::Error, Debug, Clone)] +pub enum PipelineError { + #[error("could not send cmd to redis: {0}")] + RedisKVClient(Arc), + #[error("incorrect number of responses from redis")] + IncorrectNumberOfResponses, } pub struct Pipeline { @@ -60,7 +70,7 @@ impl Pipeline { } } - async fn execute(self, client: &mut RedisKVClient) -> Vec> { + async fn execute(self, client: &mut RedisKVClient) -> Result, PipelineError> { let responses = self.replies; let batch_size = self.inner.len(); @@ -78,30 +88,20 @@ impl Pipeline { batch_size, responses, "successfully completed cancellation jobs", ); - values.into_iter().map(Ok).collect() + Ok(values.into_iter().collect()) } Ok(value) => { error!(batch_size, ?value, "unexpected redis return value"); - std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis"))) - .take(responses) - .collect() - } - Err(err) => { - std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}"))) - .take(responses) - .collect() + Err(PipelineError::IncorrectNumberOfResponses) } + Err(err) => Err(PipelineError::RedisKVClient(Arc::new(err))), } } - fn add_command_with_reply(&mut self, cmd: Cmd) { + fn add_command(&mut self, cmd: Cmd) { self.inner.add_command(cmd); self.replies += 1; } - - fn add_command_no_reply(&mut self, cmd: Cmd) { - self.inner.add_command(cmd).ignore(); - } } impl CancelKeyOp { @@ -109,12 +109,19 @@ impl CancelKeyOp { match self { CancelKeyOp::StoreCancelKey { key, value, expire } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); - pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); - pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); + pipe.add_command(Cmd::set_options( + &key, + &**value, + SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())), + )); + } + CancelKeyOp::GetCancelDataOld { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command(Cmd::hget(key, "data")); } CancelKeyOp::GetCancelData { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); - pipe.add_command_with_reply(Cmd::hget(key, "data")); + pipe.add_command(Cmd::get(key)); } } } @@ -127,13 +134,14 @@ pub struct CancellationProcessor { impl QueueProcessing for CancellationProcessor { type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp); - type Res = anyhow::Result; + type Res = redis::Value; + type Err = PipelineError; fn batch_size(&self, _queue_size: usize) -> usize { self.batch_size } - async fn apply(&mut self, batch: Vec) -> Vec { + async fn apply(&mut self, batch: Vec) -> Result, Self::Err> { if !self.client.credentials_refreshed() { // this will cause a timeout for cancellation operations tracing::debug!( @@ -244,18 +252,18 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelData { key }; + const TIMEOUT: Duration = Duration::from_secs(5); let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); }; - const TIMEOUT: Duration = Duration::from_secs(5); + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Get); + let op = CancelKeyOp::GetCancelData { key }; let result = timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -264,10 +272,37 @@ impl CancellationHandler { .map_err(|_| { tracing::warn!("timed out waiting to receive GetCancelData response"); CancelError::RateLimit - })? - // cannot be cancelled - .unwrap_or_else(|x| match x {}) - .map_err(|e| { + })?; + + // We may still have cancel keys set with HSET "data". + // Check error type and retry with HGET. + // TODO: remove code after HSET is not used anymore. + let result = if let Err(err) = result.as_ref() + && let BatchQueueError::Result(err) = err + && let PipelineError::RedisKVClient(err) = err + && let RedisKVClientError::Redis(err) = &**err + && let Some(errcode) = err.code() + && errcode == "WRONGTYPE" + { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::HGet); + let op = CancelKeyOp::GetCancelDataOld { key }; + timeout( + TIMEOUT, + tx.call((guard, op), std::future::pending::()), + ) + .await + .map_err(|_| { + tracing::warn!("timed out waiting to receive GetCancelData response"); + CancelError::RateLimit + })? + } else { + result + }; + + let result = result.map_err(|e| { tracing::warn!("failed to receive GetCancelData response: {e}"); CancelError::InternalError })?; @@ -442,7 +477,7 @@ impl Session { let guard = Metrics::get() .proxy .cancel_channel_size - .guard(RedisMsgKind::HSet); + .guard(RedisMsgKind::Set); let op = CancelKeyOp::StoreCancelKey { key: self.key, value: closure_json.clone(), @@ -456,7 +491,7 @@ impl Session { ); match tx.call((guard, op), cancel.as_mut()).await { - Ok(Ok(_)) => { + Ok(_) => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, @@ -467,10 +502,10 @@ impl Session { tokio::time::sleep(CANCEL_KEY_REFRESH).await; } // retry immediately. - Ok(Err(error)) => { + Err(BatchQueueError::Result(error)) => { tracing::warn!(?error, "error registering cancellation key"); } - Err(Err(_cancelled)) => break, + Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 9d1a3d4358..8439082498 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -374,11 +374,9 @@ pub enum Waiting { #[label(singleton = "kind")] #[allow(clippy::enum_variant_names)] pub enum RedisMsgKind { - HSet, - HSetMultiple, + Set, + Get, HGet, - HGetAll, - HDel, } #[derive(Default, Clone)] diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index 35a3fe4334..b0bf332e44 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -4,11 +4,12 @@ use std::time::Duration; use futures::FutureExt; use redis::aio::{ConnectionLike, MultiplexedConnection}; -use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult}; +use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisError, RedisResult}; use tokio::task::AbortHandle; use tracing::{error, info, warn}; use super::elasticache::CredentialsProvider; +use crate::redis::elasticache::CredentialsProviderError; enum Credentials { Static(ConnectionInfo), @@ -26,6 +27,14 @@ impl Clone for Credentials { } } +#[derive(thiserror::Error, Debug)] +pub enum ConnectionProviderError { + #[error(transparent)] + Redis(#[from] RedisError), + #[error(transparent)] + CredentialsProvider(#[from] CredentialsProviderError), +} + /// A wrapper around `redis::MultiplexedConnection` that automatically refreshes the token. /// Provides PubSub connection without credentials refresh. pub struct ConnectionWithCredentialsProvider { @@ -86,15 +95,18 @@ impl ConnectionWithCredentialsProvider { } } - async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> { - redis::cmd("PING").query_async(con).await + async fn ping(con: &mut MultiplexedConnection) -> Result<(), ConnectionProviderError> { + redis::cmd("PING") + .query_async(con) + .await + .map_err(Into::into) } pub(crate) fn credentials_refreshed(&self) -> bool { self.credentials_refreshed.load(Ordering::Relaxed) } - pub(crate) async fn connect(&mut self) -> anyhow::Result<()> { + pub(crate) async fn connect(&mut self) -> Result<(), ConnectionProviderError> { let _guard = self.mutex.lock().await; if let Some(con) = self.con.as_mut() { match Self::ping(con).await { @@ -141,7 +153,7 @@ impl ConnectionWithCredentialsProvider { Ok(()) } - async fn get_connection_info(&self) -> anyhow::Result { + async fn get_connection_info(&self) -> Result { match &self.credentials { Credentials::Static(info) => Ok(info.clone()), Credentials::Dynamic(provider, addr) => { @@ -160,7 +172,7 @@ impl ConnectionWithCredentialsProvider { } } - async fn get_client(&self) -> anyhow::Result { + async fn get_client(&self) -> Result { let client = redis::Client::open(self.get_connection_info().await?)?; self.credentials_refreshed.store(true, Ordering::Relaxed); Ok(client) diff --git a/proxy/src/redis/elasticache.rs b/proxy/src/redis/elasticache.rs index 58e3c889a7..6f3b34d381 100644 --- a/proxy/src/redis/elasticache.rs +++ b/proxy/src/redis/elasticache.rs @@ -9,10 +9,12 @@ use aws_config::meta::region::RegionProviderChain; use aws_config::profile::ProfileFileCredentialsProvider; use aws_config::provider_config::ProviderConfig; use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider; +use aws_credential_types::provider::error::CredentialsError; use aws_sdk_iam::config::ProvideCredentials; use aws_sigv4::http_request::{ - self, SignableBody, SignableRequest, SignatureLocation, SigningSettings, + self, SignableBody, SignableRequest, SignatureLocation, SigningError, SigningSettings, }; +use aws_sigv4::sign::v4::signing_params::BuildError; use tracing::info; #[derive(Debug)] @@ -40,6 +42,18 @@ impl AWSIRSAConfig { } } +#[derive(thiserror::Error, Debug)] +pub enum CredentialsProviderError { + #[error(transparent)] + AwsCredentials(#[from] CredentialsError), + #[error(transparent)] + AwsSigv4Build(#[from] BuildError), + #[error(transparent)] + AwsSigv4Singing(#[from] SigningError), + #[error(transparent)] + Http(#[from] http::Error), +} + /// Credentials provider for AWS elasticache authentication. /// /// Official documentation: @@ -92,7 +106,9 @@ impl CredentialsProvider { }) } - pub(crate) async fn provide_credentials(&self) -> anyhow::Result<(String, String)> { + pub(crate) async fn provide_credentials( + &self, + ) -> Result<(String, String), CredentialsProviderError> { let aws_credentials = self .credentials_provider .provide_credentials() diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index cfdbc21839..d1e97b6b09 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -2,9 +2,18 @@ use std::time::Duration; use futures::FutureExt; use redis::aio::ConnectionLike; -use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; +use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult}; use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; +use crate::redis::connection_with_credentials_provider::ConnectionProviderError; + +#[derive(thiserror::Error, Debug)] +pub enum RedisKVClientError { + #[error(transparent)] + Redis(#[from] RedisError), + #[error(transparent)] + ConnectionProvider(#[from] ConnectionProviderError), +} pub struct RedisKVClient { client: ConnectionWithCredentialsProvider, @@ -32,12 +41,13 @@ impl RedisKVClient { Self { client } } - pub async fn try_connect(&mut self) -> anyhow::Result<()> { + pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> { self.client .connect() .boxed() .await .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) + .map_err(Into::into) } pub(crate) fn credentials_refreshed(&self) -> bool { @@ -47,7 +57,7 @@ impl RedisKVClient { pub(crate) async fn query( &mut self, q: &impl Queryable, - ) -> anyhow::Result { + ) -> Result { let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), Err(e) => e, From 9bba31bf6805e1c179b75fbb5bcab96c96980c75 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 11 Jul 2025 20:39:08 +0100 Subject: [PATCH 13/14] proxy: encode json as we parse rows (#11992) Serialize query row responses directly into JSON. Some of this code should be using the `json::value_as_object/list` macros, but I've avoided it for now to minimize the size of the diff. --- Cargo.lock | 1 + proxy/Cargo.toml | 1 + proxy/src/serverless/json.rs | 95 +++++++--------- proxy/src/serverless/sql_over_http.rs | 154 +++++++++++++------------- 4 files changed, 122 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4323254f0a..14b460005a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5329,6 +5329,7 @@ dependencies = [ "itoa", "jose-jwa", "jose-jwk", + "json", "lasso", "measured", "metrics", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 0a406d1ca8..82fe6818e3 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -49,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] } ipnet.workspace = true itertools.workspace = true itoa.workspace = true +json = { path = "../libs/proxy/json" } lasso = { workspace = true, features = ["multi-threaded"] } measured = { workspace = true, features = ["lasso"] } metrics.workspace = true diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index 2e67d07079..ef7c8a4d82 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -1,6 +1,7 @@ +use json::{ListSer, ObjectSer, ValueSer}; use postgres_client::Row; use postgres_client::types::{Kind, Type}; -use serde_json::{Map, Value}; +use serde_json::Value; // // Convert json non-string types to strings, so that they can be passed to Postgres @@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError { UnbalancedString, } -enum OutputMode { - Array(Vec), - Object(Map), +enum OutputMode<'a> { + Array(ListSer<'a>), + Object(ObjectSer<'a>), } -impl OutputMode { - fn key(&mut self, key: &str) -> &mut Value { +impl OutputMode<'_> { + fn key(&mut self, key: &str) -> ValueSer<'_> { match self { - OutputMode::Array(values) => push_entry(values, Value::Null), - OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null), + OutputMode::Array(values) => values.entry(), + OutputMode::Object(map) => map.key(key), } } - fn finish(self) -> Value { + fn finish(self) { match self { - OutputMode::Array(values) => Value::Array(values), - OutputMode::Object(map) => Value::Object(map), + OutputMode::Array(values) => values.finish(), + OutputMode::Object(map) => map.finish(), } } } -fn push_entry(arr: &mut Vec, t: T) -> &mut T { - arr.push(t); - arr.last_mut().expect("a value was just inserted") -} - // // Convert postgres row with text-encoded values to JSON object // pub(crate) fn pg_text_row_to_json( + output: ValueSer, row: &Row, raw_output: bool, array_mode: bool, -) -> Result { +) -> Result<(), JsonConversionError> { let mut entries = if array_mode { - OutputMode::Array(Vec::with_capacity(row.columns().len())) + OutputMode::Array(output.list()) } else { - OutputMode::Object(Map::with_capacity(row.columns().len())) + OutputMode::Object(output.object()) }; for (i, column) in row.columns().iter().enumerate() { @@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json( let value = entries.key(column.name()); match pg_value { - Some(v) if raw_output => *value = Value::String(v.to_string()), + Some(v) if raw_output => value.value(v), Some(v) => pg_text_to_json(value, v, column.type_())?, - None => *value = Value::Null, + None => value.value(json::Null), } } - Ok(entries.finish()) + entries.finish(); + Ok(()) } // // Convert postgres text-encoded value to JSON value // -fn pg_text_to_json( - output: &mut Value, - val: &str, - pg_type: &Type, -) -> Result<(), JsonConversionError> { +fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> { if let Kind::Array(elem_type) = pg_type.kind() { // todo: we should fetch this from postgres. let delimiter = ','; - let mut array = vec![]; - pg_array_parse(&mut array, val, elem_type, delimiter)?; - *output = Value::Array(array); + json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?); return Ok(()); } match *pg_type { - Type::BOOL => *output = Value::Bool(val == "t"), + Type::BOOL => output.value(val == "t"), Type::INT2 | Type::INT4 => { let val = val.parse::()?; - *output = Value::Number(serde_json::Number::from(val)); + output.value(val); } Type::FLOAT4 | Type::FLOAT8 => { let fval = val.parse::()?; - let num = serde_json::Number::from_f64(fval); - if let Some(num) = num { - *output = Value::Number(num); + if fval.is_finite() { + output.value(fval); } else { // Pass Nan, Inf, -Inf as strings // JS JSON.stringify() does converts them to null, but we // want to preserve them, so we pass them as strings - *output = Value::String(val.to_string()); + output.value(val); } } - Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?, - _ => *output = Value::String(val.to_string()), + // we assume that the string value is valid json. + Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()), + _ => output.value(val), } Ok(()) @@ -192,7 +184,7 @@ fn pg_text_to_json( /// gets its own level of curly braces, and delimiters must be written between adjacent /// curly-braced entities of the same level. fn pg_array_parse( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &str, elem: &Type, delim: char, @@ -221,7 +213,7 @@ fn pg_array_parse( /// reads a single array from the `pg_array` string and pushes each values to `elements`. /// returns the rest of the `pg_array` string that was not read. fn pg_array_parse_inner<'a>( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &'a str, elem: &Type, delim: char, @@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>( let mut q = String::new(); loop { - let value = push_entry(elements, Value::Null); + let value = elements.entry(); pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?; // check for separator. @@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>( /// /// `quoted` is a scratch allocation that has no defined output. fn pg_array_parse_item<'a>( - output: &mut Value, + output: ValueSer, quoted: &mut String, mut pg_array: &'a str, elem: &Type, @@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>( if pg_array.starts_with('{') { // nested array. - let mut nested = vec![]; - pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?; - *output = Value::Array(nested); + pg_array = + json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?; return Ok(pg_array); } @@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>( // we might have an item string: // check for null if item == "NULL" { - *output = Value::Null; + output.value(json::Null); } else { pg_text_to_json(output, item, elem)?; } @@ -440,15 +431,15 @@ mod tests { } fn pg_text_to_json(val: &str, pg_type: &Type) -> Value { - let mut v = Value::Null; - super::pg_text_to_json(&mut v, val, pg_type).unwrap(); - v + let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap()); + serde_json::from_str(&output).unwrap() } fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value { - let mut array = vec![]; - super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap(); - Value::Array(array) + let output = json::value_to_string!(|v| json::value_as_list!(|v| { + super::pg_array_parse(v, pg_array, pg_type, ',').unwrap(); + })); + serde_json::from_str(&output).unwrap() } #[test] diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7a718d0280..8a14f804b6 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue}; use hyper::{Request, Response, StatusCode, header}; use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; -use postgres_client::{ - GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction, -}; -use serde::Serialize; +use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use serde_json::Value; use serde_json::value::RawValue; use tokio::time::{self, Instant}; @@ -687,32 +684,21 @@ impl QueryData { let (inner, mut discard) = client.inner(); let cancel_token = inner.cancel_token(); - match select( + let mut json_buf = vec![]; + + let batch_result = match select( pin!(query_to_json( config, &mut *inner, self, - &mut 0, + json::ValueSer::new(&mut json_buf), parsed_headers )), pin!(cancel.cancelled()), ) .await { - // The query successfully completed. - Either::Left((Ok((status, results)), __not_yet_cancelled)) => { - discard.check_idle(status); - - let json_output = - serde_json::to_string(&results).expect("json serialization should not fail"); - Ok(json_output) - } - // The query failed with an error - Either::Left((Err(e), __not_yet_cancelled)) => { - discard.discard(); - Err(e) - } - // The query was cancelled. + Either::Left((res, __not_yet_cancelled)) => res, Either::Right((_cancelled, query)) => { tracing::info!("cancelling query"); if let Err(err) = cancel_token.cancel_query(NoTls).await { @@ -721,13 +707,7 @@ impl QueryData { // wait for the query cancellation match time::timeout(time::Duration::from_millis(100), query).await { // query successed before it was cancelled. - Ok(Ok((status, results))) => { - discard.check_idle(status); - - let json_output = serde_json::to_string(&results) - .expect("json serialization should not fail"); - Ok(json_output) - } + Ok(Ok(status)) => Ok(status), // query failed or was cancelled. Ok(Err(error)) => { let db_error = match &error { @@ -743,14 +723,29 @@ impl QueryData { discard.discard(); } - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } Err(_timeout) => { discard.discard(); - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } } } + }; + + match batch_result { + // The query successfully completed. + Ok(status) => { + discard.check_idle(status); + + let json_output = String::from_utf8(json_buf).expect("json should be valid utf8"); + Ok(json_output) + } + // The query failed with an error + Err(e) => { + discard.discard(); + Err(e) + } } } } @@ -787,7 +782,7 @@ impl BatchQueryData { }) .map_err(SqlOverHttpError::Postgres)?; - let json_output = match query_batch( + let json_output = match query_batch_to_json( config, cancel.child_token(), &mut transaction, @@ -845,24 +840,21 @@ async fn query_batch( transaction: &mut Transaction<'_>, queries: BatchQueryData, parsed_headers: HttpHeaders, -) -> Result { - let mut results = Vec::with_capacity(queries.queries.len()); - let mut current_size = 0; + results: &mut json::ListSer<'_>, +) -> Result<(), SqlOverHttpError> { for stmt in queries.queries { let query = pin!(query_to_json( config, transaction, stmt, - &mut current_size, + results.entry(), parsed_headers, )); let cancelled = pin!(cancel.cancelled()); let res = select(query, cancelled).await; match res { // TODO: maybe we should check that the transaction bit is set here - Either::Left((Ok((_, values)), _cancelled)) => { - results.push(values); - } + Either::Left((Ok(_), _cancelled)) => {} Either::Left((Err(e), _cancelled)) => { return Err(e); } @@ -872,8 +864,22 @@ async fn query_batch( } } - let results = json!({ "results": results }); - let json_output = serde_json::to_string(&results).expect("json serialization should not fail"); + Ok(()) +} + +async fn query_batch_to_json( + config: &'static HttpConfig, + cancel: CancellationToken, + tx: &mut Transaction<'_>, + queries: BatchQueryData, + headers: HttpHeaders, +) -> Result { + let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| { + let results = obj.key("results"); + json::value_as_list!(|results| { + query_batch(config, cancel, tx, queries, headers, results).await?; + }); + })); Ok(json_output) } @@ -882,54 +888,54 @@ async fn query_to_json( config: &'static HttpConfig, client: &mut T, data: QueryData, - current_size: &mut usize, + output: json::ValueSer<'_>, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize + use), SqlOverHttpError> { +) -> Result { let query_start = Instant::now(); - let query_params = data.params; + let mut output = json::ObjectSer::new(output); let mut row_stream = client - .query_raw_txt(&data.query, query_params) + .query_raw_txt(&data.query, data.params) .await .map_err(SqlOverHttpError::Postgres)?; let query_acknowledged = Instant::now(); - let columns_len = row_stream.statement.columns().len(); - let mut fields = Vec::with_capacity(columns_len); - + let mut json_fields = output.key("fields").list(); for c in row_stream.statement.columns() { - fields.push(json!({ - "name": c.name().to_owned(), - "dataTypeID": c.type_().oid(), - "tableID": c.table_oid(), - "columnID": c.column_id(), - "dataTypeSize": c.type_size(), - "dataTypeModifier": c.type_modifier(), - "format": "text", - })); + let json_field = json_fields.entry(); + json::value_as_object!(|json_field| { + json_field.entry("name", c.name()); + json_field.entry("dataTypeID", c.type_().oid()); + json_field.entry("tableID", c.table_oid()); + json_field.entry("columnID", c.column_id()); + json_field.entry("dataTypeSize", c.type_size()); + json_field.entry("dataTypeModifier", c.type_modifier()); + json_field.entry("format", "text"); + }); } + json_fields.finish(); - let raw_output = parsed_headers.raw_output; let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode); + let raw_output = parsed_headers.raw_output; // Manually drain the stream into a vector to leave row_stream hanging // around to get a command tag. Also check that the response is not too // big. - let mut rows = Vec::new(); + let mut rows = 0; + let mut json_rows = output.key("rows").list(); while let Some(row) = row_stream.next().await { let row = row.map_err(SqlOverHttpError::Postgres)?; - *current_size += row.body_len(); // we don't have a streaming response support yet so this is to prevent OOM // from a malicious query (eg a cross join) - if *current_size > config.max_response_size_bytes { + if json_rows.as_buffer().len() > config.max_response_size_bytes { return Err(SqlOverHttpError::ResponseTooLarge( config.max_response_size_bytes, )); } - let row = pg_text_row_to_json(&row, raw_output, array_mode)?; - rows.push(row); + pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?; + rows += 1; // assumption: parsing pg text and converting to json takes CPU time. // let's assume it is slightly expensive, so we should consume some cooperative budget. @@ -937,16 +943,14 @@ async fn query_to_json( // of rows and never hit the tokio mpsc for a long time (although unlikely). tokio::task::consume_budget().await; } + json_rows.finish(); let query_resp_end = Instant::now(); - let RowStream { - command_tag, - status: ready, - .. - } = row_stream; + + let ready = row_stream.status; // grab the command tag and number of rows affected - let command_tag = command_tag.unwrap_or_default(); + let command_tag = row_stream.command_tag.unwrap_or_default(); let mut command_tag_split = command_tag.split(' '); let command_tag_name = command_tag_split.next().unwrap_or_default(); let command_tag_count = if command_tag_name == "INSERT" { @@ -959,7 +963,7 @@ async fn query_to_json( .and_then(|s| s.parse::().ok()); info!( - rows = rows.len(), + rows, ?ready, command_tag, acknowledgement = ?(query_acknowledged - query_start), @@ -967,16 +971,12 @@ async fn query_to_json( "finished executing query" ); - // Resulting JSON format is based on the format of node-postgres result. - let results = json!({ - "command": command_tag_name.to_string(), - "rowCount": command_tag_count, - "rows": rows, - "fields": fields, - "rowAsArray": array_mode, - }); + output.entry("command", command_tag_name); + output.entry("rowCount", command_tag_count); + output.entry("rowAsArray", array_mode); - Ok((ready, results)) + output.finish(); + Ok(ready) } enum Client { From ee7bb1a66746e4bbbf1213792b8169e00ce08334 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Sat, 12 Jul 2025 08:57:04 +0400 Subject: [PATCH 14/14] storcon: validate new_sk_set before starting safekeeper migration (#12546) ## Problem We don't validate the validity of the `new_sk_set` before starting the migration. It is validated later, so the migration to an invalid safekeeper set will fail anyway. But at this point we might already commited an invalid `new_sk_set` to the database and there is no `abort` command yet (I ran into this issue in neon_local and ruined the timeline :) - Part of https://github.com/neondatabase/neon/issues/11669 ## Summary of changes - Add safekeeper count and safekeeper duplication checks before starting the migration - Test that we validate the `new_sk_set` before starting the migration - Add `force` option to the `TimelineSafekeeperMigrateRequest` to disable not-mandatory checks --- .../src/service/safekeeper_service.rs | 45 +++++++++++++++---- .../regress/test_safekeeper_migration.py | 38 ++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 42ddf81e3e..7521d7bd86 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -39,13 +39,13 @@ use utils::lsn::Lsn; use super::Service; impl Service { - fn make_member_set(safekeepers: &[Safekeeper]) -> Result { + fn make_member_set(safekeepers: &[Safekeeper]) -> Result { let members = safekeepers .iter() .map(|sk| sk.get_safekeeper_id()) .collect::>(); - MemberSet::new(members).map_err(ApiError::InternalServerError) + MemberSet::new(members) } fn get_safekeepers(&self, ids: &[i64]) -> Result, ApiError> { @@ -80,7 +80,7 @@ impl Service { ) -> Result, ApiError> { let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?; - let mset = Self::make_member_set(&safekeepers)?; + let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?; let mconf = safekeeper_api::membership::Configuration::new(mset); let req = safekeeper_api::models::TimelineCreateRequest { @@ -1105,6 +1105,26 @@ impl Service { } } + if new_sk_set.is_empty() { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "new safekeeper set is empty" + ))); + } + + if new_sk_set.len() < self.config.timeline_safekeeper_count { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "new safekeeper set must have at least {} safekeepers", + self.config.timeline_safekeeper_count + ))); + } + + let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::>(); + let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?; + // Construct new member set in advance to validate it. + // E.g. validates that there is no duplicate safekeepers. + let new_sk_member_set = + Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?; + // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks. let _tenant_lock = trace_shared_lock( &self.tenant_op_locks, @@ -1135,6 +1155,18 @@ impl Service { .map(|&id| NodeId(id as u64)) .collect::>(); + // Validate that we are not migrating to a decomissioned safekeeper. + for sk in new_safekeepers.iter() { + if !cur_sk_set.contains(&sk.get_id()) + && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned + { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "safekeeper {} is decomissioned", + sk.get_id() + ))); + } + } + tracing::info!( ?cur_sk_set, ?new_sk_set, @@ -1177,11 +1209,8 @@ impl Service { } let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; - let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?; - - let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::>(); - let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?; - let new_sk_member_set = Self::make_member_set(&new_safekeepers)?; + let cur_sk_member_set = + Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?; let joint_config = membership::Configuration { generation, diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index b82d7b9bb0..170c1a3650 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -2,6 +2,9 @@ from __future__ import annotations from typing import TYPE_CHECKING +import pytest +from fixtures.neon_fixtures import StorageControllerApiException + if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnvBuilder @@ -75,3 +78,38 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): ep.start(safekeeper_generation=1, safekeepers=[3]) assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] + + +def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper_migrate validates the new_sk_set before starting the migration. + """ + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 2, + } + env = neon_env_builder.init_start() + + def expect_fail(sk_set: list[int], match: str): + with pytest.raises(StorageControllerApiException, match=match): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, sk_set + ) + # Check that we failed before commiting to the database. + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["generation"] == 1 + + expect_fail([], "safekeeper set is empty") + expect_fail([1], "must have at least 2 safekeepers") + expect_fail([1, 1], "duplicate safekeeper") + expect_fail([1, 100500], "does not exist") + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + sk_set = mconf["sk_set"] + assert len(sk_set) == 2 + + decom_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0] + env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned") + + expect_fail([sk_set[0], decom_sk], "decomissioned")