diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a8c7083b17..b02c6a613a 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; -use crate::models::{PageserverUtilization, ShardParameters, TenantConfig}; +use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo}; use crate::shard::{ShardStripeSize, TenantShardId}; #[derive(Serialize, Deserialize, Debug)] @@ -126,6 +126,13 @@ pub struct TenantDescribeResponse { pub config: TenantConfig, } +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantTimelineDescribeResponse { + pub shards: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_consistent_lsn: Option, +} + #[derive(Serialize, Deserialize, Debug)] pub struct NodeShardResponse { pub node_id: NodeId, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 56dd95eab3..11e02a8550 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1622,6 +1622,9 @@ pub struct TimelineInfo { /// Whether the timeline is invisible in synthetic size calculations. pub is_invisible: Option, + // HADRON: the largest LSN below which all page updates have been included in the image layers. + #[serde(skip_serializing_if = "Option::is_none")] + pub image_consistent_lsn: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7030ac368d..d839bac557 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -397,6 +397,7 @@ async fn build_timeline_info( timeline: &Arc, include_non_incremental_logical_size: bool, force_await_initial_logical_size: bool, + include_image_consistent_lsn: bool, ctx: &RequestContext, ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -421,6 +422,10 @@ async fn build_timeline_info( .await?, ); } + // HADRON + if include_image_consistent_lsn { + info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?); + } Ok(info) } @@ -510,6 +515,8 @@ async fn build_timeline_info_common( is_invisible: Some(is_invisible), walreceiver_status, + // HADRON + image_consistent_lsn: None, }; Ok(info) } @@ -712,6 +719,8 @@ async fn timeline_list_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); @@ -732,6 +741,7 @@ async fn timeline_list_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), &ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) @@ -760,6 +770,9 @@ async fn timeline_and_offloaded_list_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); @@ -780,6 +793,7 @@ async fn timeline_and_offloaded_list_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), &ctx, ) .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id)) @@ -964,6 +978,9 @@ async fn timeline_detail_handler( parse_query_param(&request, "include-non-incremental-logical-size")?; let force_await_initial_logical_size: Option = parse_query_param(&request, "force-await-initial-logical-size")?; + // HADRON + let include_image_consistent_lsn: Option = + parse_query_param(&request, "include-image-consistent-lsn")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; // Logical size calculation needs downloading. @@ -984,6 +1001,7 @@ async fn timeline_detail_handler( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), + include_image_consistent_lsn.unwrap_or(false), ctx, ) .await @@ -3643,6 +3661,7 @@ async fn activate_post_import_handler( let timeline_info = build_timeline_info( &timeline, false, // include_non_incremental_logical_size, false, // force_await_initial_logical_size + false, // include_image_consistent_lsn &ctx, ) .await diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f67269851a..f75a03a508 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12816,6 +12816,40 @@ mod tests { }, ] ); + + Ok(()) + } + + #[tokio::test] + async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> { + let tenant_conf = pageserver_api::models::TenantConfig { + pitr_interval: Some(Duration::from_secs(7 * 3600)), + image_layer_force_creation_period: Some(Duration::from_secs(3600)), + ..Default::default() + }; + + let tenant_id = TenantId::generate(); + + let harness = TenantHarness::create_custom( + "test_get_force_image_creation_lsn", + tenant_conf, + tenant_id, + ShardIdentity::unsharded(), + Generation::new(1), + ) + .await?; + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100)); + { + let writer = timeline.writer().await; + writer.finish_write(Lsn(5000)); + } + + let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap(); + assert_eq!(image_creation_lsn, Lsn(4300)); Ok(()) } } diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 23052ccee7..ba02602cfe 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -46,10 +46,11 @@ mod historic_layer_coverage; mod layer_coverage; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use anyhow::Result; use historic_layer_coverage::BufferedHistoricLayerCoverage; @@ -904,6 +905,103 @@ impl LayerMap { max_stacked_deltas } + /* BEGIN_HADRON */ + /** + * Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully. + * It works by first finding the latest image layers and store them into a map. Then for each delta layer, + * find all overlapping image layers in order to potentially increase the image LSN in case there are gaps + * (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase + * image LSN to 150 because there is no WAL record in between). + * Finally, the image consistent LSN is computed by taking the minimum of all image layers. + */ + pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn { + struct ImageLayerInfo { + // creation LSN of the image layer + image_lsn: Lsn, + // the current minimum LSN of newer delta layers with overlapping key ranges + min_delta_lsn: Lsn, + } + let started_at = Instant::now(); + + let min_l0_deltas_lsn = { + let l0_deltas = self.level0_deltas(); + l0_deltas + .iter() + .map(|layer| layer.get_lsn_range().start) + .min() + .unwrap_or(disk_consistent_lsn) + }; + let global_key_range = Key::MIN..Key::MAX; + + // step 1: collect all most recent image layers into a map + // map: end key to image_layer_info + let mut image_map: BTreeMap = BTreeMap::new(); + for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) { + let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0)); + image_map.insert( + img_range.end, + ImageLayerInfo { + image_lsn: img_lsn, + min_delta_lsn: min_l0_deltas_lsn, + }, + ); + } + + // step 2: go through all delta layers, and update the image layer info with overlapping + // key ranges + for layer in self.historic.iter() { + if !layer.is_delta { + continue; + } + let delta_key_range = layer.get_key_range(); + let delta_lsn_range = layer.get_lsn_range(); + for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) { + debug_assert!(img_end_key >= &delta_key_range.start); + if delta_lsn_range.end > img_info.image_lsn { + // the delta layer includes WAL records after the image + // it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3 + img_info.min_delta_lsn = + std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start); + } + if img_end_key >= &delta_key_range.end { + // we have fully processed all overlapping image layers + break; + } + } + } + + // step 3, go through all image layers and find the image consistent LSN + let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap(); + let mut prev_key = Key::MIN; + for (img_key, img_info) in image_map { + tracing::debug!( + "Image layer {:?}:{} has min delta lsn {}", + Range { + start: prev_key, + end: img_key, + }, + img_info.image_lsn, + img_info.min_delta_lsn, + ); + let image_lsn = std::cmp::max( + img_info.image_lsn, + img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)), + ); + img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn); + prev_key = img_key; + } + tracing::info!( + "computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.", + img_consistent_lsn, + disk_consistent_lsn, + started_at.elapsed().as_millis(), + self.historic.len() + ); + img_consistent_lsn + } + + /* END_HADRON */ + /// Return all L0 delta layers pub fn level0_deltas(&self) -> &Vec> { &self.l0_delta_layers @@ -1579,6 +1677,138 @@ mod tests { LayerVisibilityHint::Visible )); } + + /* BEGIN_HADRON */ + #[test] + fn test_compute_image_consistent_lsn() { + let mut layer_map = LayerMap::default(); + + let disk_consistent_lsn = Lsn(1000); + // case 1: empty layer map + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!( + disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(), + image_consistent_lsn + ); + + // case 2: only L0 delta layer + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(100), + Lsn(900)..Lsn(990), + true, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(100), + Lsn(850)..Lsn(899), + true, + )); + } + + // should use min L0 delta LSN - 1 as image consistent LSN + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(849), image_consistent_lsn); + + // case 3: 3 images, no L1 delta + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(40), + Lsn(100)..Lsn(100), + false, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(40)..Key::from_i128(70), + Lsn(200)..Lsn(200), + false, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(70)..Key::from_i128(100), + Lsn(150)..Lsn(150), + false, + )); + } + // should use min L0 delta LSN - 1 as image consistent LSN + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(849), image_consistent_lsn); + + // case 4: 3 images with 1 L1 delta + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(50), + Lsn(300)..Lsn(350), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(299), image_consistent_lsn); + + // case 5: 3 images with 1 more L1 delta with smaller LSN + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(50)..Key::from_i128(72), + Lsn(200)..Lsn(300), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 6: 3 images with more newer L1 deltas (no impact on final results) + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(30), + Lsn(400)..Lsn(500), + true, + )); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(35)..Key::from_i128(100), + Lsn(450)..Lsn(600), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 7: 3 images with more older L1 deltas (no impact on final results) + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(40), + Lsn(0)..Lsn(50), + true, + )); + + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(50)..Key::from_i128(100), + Lsn(10)..Lsn(60), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(199), image_consistent_lsn); + + // case 8: 3 images with one more L1 delta with overlapping LSN range + { + let mut updates = layer_map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_test( + Key::from_i128(0)..Key::from_i128(50), + Lsn(50)..Lsn(250), + true, + )); + } + let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn); + assert_eq!(Lsn(100), image_consistent_lsn); + } + + /* END_HADRON */ } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a9bc0a060b..718ea925b7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -351,13 +351,6 @@ pub struct Timeline { last_image_layer_creation_check_at: AtomicLsn, last_image_layer_creation_check_instant: std::sync::Mutex>, - // HADRON - /// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation - /// on this key range. - force_image_creation_lsn: AtomicLsn, - /// The last time instant when force_image_creation_lsn is computed. - force_image_creation_lsn_computed_at: std::sync::Mutex>, - /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, @@ -2854,7 +2847,7 @@ impl Timeline { } // HADRON - fn get_image_creation_timeout(&self) -> Option { + fn get_image_layer_force_creation_period(&self) -> Option { let tenant_conf = self.tenant_conf.load(); tenant_conf .tenant_conf @@ -3134,9 +3127,6 @@ impl Timeline { repartition_threshold: 0, last_image_layer_creation_check_at: AtomicLsn::new(0), last_image_layer_creation_check_instant: Mutex::new(None), - // HADRON - force_image_creation_lsn: AtomicLsn::new(0), - force_image_creation_lsn_computed_at: std::sync::Mutex::new(None), last_received_wal: Mutex::new(None), rel_size_latest_cache: RwLock::new(HashMap::new()), rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), @@ -5381,13 +5371,16 @@ impl Timeline { } // HADRON + // for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline + min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn()); if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 { info!( - "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}", + "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}", partition.ranges[0].start, partition.ranges[0].end, min_image_lsn, - force_image_creation_lsn.unwrap() + force_image_creation_lsn.unwrap(), + max_deltas ); return true; } @@ -7153,6 +7146,19 @@ impl Timeline { .unwrap() .clone() } + + /* BEGIN_HADRON */ + pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result { + let guard = self + .layers + .read(LayerManagerLockHolder::ComputeImageConsistentLsn) + .await; + let layer_map = guard.layer_map()?; + let disk_consistent_lsn = self.get_disk_consistent_lsn(); + + Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn)) + } + /* END_HADRON */ } impl Timeline { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 171f9d1284..aa1aa937b6 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -8,7 +8,7 @@ use std::cmp::min; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant}; use super::layer_manager::LayerManagerLockHolder; use super::{ @@ -34,7 +34,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; -use postgres_ffi::to_pg_timestamp; use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; @@ -47,7 +46,6 @@ use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; -use crate::pgdatadir_mapping::LsnForTimestamp; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::gc_block::GcBlock; @@ -1271,10 +1269,7 @@ impl Timeline { // Define partitioning schema if needed // HADRON - let force_image_creation_lsn = self - .get_or_compute_force_image_creation_lsn(cancel, ctx) - .await - .map_err(CompactionError::Other)?; + let force_image_creation_lsn = self.get_force_image_creation_lsn(); // 1. L0 Compact let l0_outcome = { @@ -1484,59 +1479,37 @@ impl Timeline { } /* BEGIN_HADRON */ - // Get the force image creation LSN. Compute it if the last computed LSN is too old. - async fn get_or_compute_force_image_creation_lsn( - self: &Arc, - cancel: &CancellationToken, - ctx: &RequestContext, - ) -> anyhow::Result> { - const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes - let image_layer_force_creation_period = self.get_image_creation_timeout(); - if image_layer_force_creation_period.is_none() { - return Ok(None); + // Get the force image creation LSN based on gc_cutoff_lsn. + // Note that this is an estimation and the workload rate may suddenly change. When that happens, + // the force image creation may be too early or too late, but eventually it should be able to catch up. + pub(crate) fn get_force_image_creation_lsn(self: &Arc) -> Option { + let image_creation_period = self.get_image_layer_force_creation_period()?; + let current_lsn = self.get_last_record_lsn(); + let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?; + let pitr_interval = self.get_pitr_interval(); + if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() { + tracing::warn!( + "pitr LSN/interval not found, skipping force image creation LSN calculation" + ); + return None; } - let image_layer_force_creation_period = image_layer_force_creation_period.unwrap(); - let force_image_creation_lsn_computed_at = - *self.force_image_creation_lsn_computed_at.lock().unwrap(); - if force_image_creation_lsn_computed_at.is_none() - || force_image_creation_lsn_computed_at.unwrap().elapsed() - > FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL - { - let now: SystemTime = SystemTime::now(); - let timestamp = now - .checked_sub(image_layer_force_creation_period) - .ok_or_else(|| { - anyhow::anyhow!( - "image creation timeout is too large: {image_layer_force_creation_period:?}" - ) - })?; - let timestamp = to_pg_timestamp(timestamp); - let force_image_creation_lsn = match self - .find_lsn_for_timestamp(timestamp, cancel, ctx) - .await? - { - LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn, - _ => { - let gc_lsn = *self.get_applied_gc_cutoff_lsn(); - tracing::info!( - "no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}", - gc_lsn - ); - gc_lsn - } - }; - self.force_image_creation_lsn - .store(force_image_creation_lsn); - *self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now()); - tracing::info!( - "computed force image creation LSN: {}", - force_image_creation_lsn - ); - Ok(Some(force_image_creation_lsn)) - } else { - Ok(Some(self.force_image_creation_lsn.load())) - } + let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0 + * image_creation_period.as_secs() + / pitr_interval.as_secs(); + let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0)); + + tracing::info!( + "Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}", + self.tenant_shard_id, + force_image_creation_lsn, + current_lsn, + image_creation_period, + pitr_lsn, + pitr_interval + ); + + Some(force_image_creation_lsn) } /* END_HADRON */ diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 2eccf48579..d8d81a6c91 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -47,6 +47,7 @@ pub(crate) enum LayerManagerLockHolder { ImportPgData, DetachAncestor, Eviction, + ComputeImageConsistentLsn, #[cfg(test)] Testing, } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e5a3a969d4..62fc212e12 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -850,6 +850,31 @@ async fn handle_tenant_describe( json_response(StatusCode::OK, service.tenant_describe(tenant_id)?) } +/* BEGIN_HADRON */ +async fn handle_tenant_timeline_describe( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Scrubber)?; + + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(_req) => {} + }; + + json_response( + StatusCode::OK, + service + .tenant_timeline_describe(tenant_id, timeline_id) + .await?, + ) +} +/* END_HADRON */ + async fn handle_tenant_list( service: Arc, req: Request, @@ -2480,6 +2505,13 @@ pub fn make_router( ) }) // Timeline operations + .get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { + tenant_service_handler( + r, + handle_tenant_timeline_describe, + RequestName("v1_tenant_timeline_describe"), + ) + }) .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { tenant_service_handler( r, diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index d6fe173eb3..da0687895a 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -86,6 +86,23 @@ impl PageserverClient { ) } + /* BEGIN_HADRON */ + pub(crate) async fn tenant_timeline_describe( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + ) -> Result { + measured_request!( + "tenant_timeline_describe", + crate::metrics::Method::Get, + &self.node_id_label, + self.inner + .tenant_timeline_describe(tenant_shard_id, timeline_id,) + .await + ) + } + /* END_HADRON */ + pub(crate) async fn tenant_scan_remote_storage( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9c1b81d261..31d149c5ac 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -32,7 +32,7 @@ use pageserver_api::controller_api::{ ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, - TenantShardMigrateRequest, TenantShardMigrateResponse, + TenantShardMigrateRequest, TenantShardMigrateResponse, TenantTimelineDescribeResponse, }; use pageserver_api::models::{ self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease, @@ -5486,6 +5486,92 @@ impl Service { .ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into())) } + /* BEGIN_HADRON */ + pub(crate) async fn tenant_timeline_describe( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result { + self.tenant_remote_mutation(tenant_id, |locations| async move { + if locations.0.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + }; + + let locations: Vec<(TenantShardId, Node)> = locations + .0 + .iter() + .map(|t| (*t.0, t.1.latest.node.clone())) + .collect(); + let mut futs = FuturesUnordered::new(); + + for (shard_id, node) in locations { + futs.push({ + async move { + let result = node + .with_client_retries( + |client| async move { + client + .tenant_timeline_describe(&shard_id, &timeline_id) + .await + }, + &self.http_client, + &self.config.pageserver_jwt_token, + 3, + 3, + Duration::from_secs(30), + &self.cancel, + ) + .await; + (result, shard_id, node.get_id()) + } + }); + } + + let mut results: Vec = Vec::new(); + while let Some((result, tenant_shard_id, node_id)) = futs.next().await { + match result { + Some(Ok(timeline_info)) => results.push(timeline_info), + Some(Err(e)) => { + tracing::warn!( + "Failed to describe tenant {} timeline {} for pageserver {}: {e}", + tenant_shard_id, + timeline_id, + node_id, + ); + return Err(ApiError::ResourceUnavailable(format!("{e}").into())); + } + None => return Err(ApiError::Cancelled), + } + } + let mut image_consistent_lsn: Option = Some(Lsn::MAX); + for timeline_info in &results { + if let Some(tline_image_consistent_lsn) = timeline_info.image_consistent_lsn { + image_consistent_lsn = Some(std::cmp::min( + image_consistent_lsn.unwrap(), + tline_image_consistent_lsn, + )); + } else { + tracing::warn!( + "Timeline {} on shard {} does not have image consistent lsn", + timeline_info.timeline_id, + timeline_info.tenant_id + ); + image_consistent_lsn = None; + break; + } + } + + Ok(TenantTimelineDescribeResponse { + shards: results, + image_consistent_lsn, + }) + }) + .await? + } + /* END_HADRON */ + /// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not /// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory /// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 42924f9b83..a7b7f0e74d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2342,6 +2342,20 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() return response.json() + # HADRON + def tenant_timeline_describe( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ): + response = self.request( + "GET", + f"{self.api}/control/v1/tenant/{tenant_id}/timeline/{timeline_id}", + headers=self.headers(TokenScope.ADMIN), + ) + response.raise_for_status() + return response.json() + def nodes(self): """ :return: list of {"id": ""} diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index e67161c6b7..ab02314288 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -960,9 +960,9 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): return image_layer_count, delta_layer_count -def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): +def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder): """ - Tests that page server can force creating new images if image creation timeout is enabled + Tests that page server can force creating new images if image_layer_force_creation_period is enabled """ # use large knobs to disable L0 compaction/image creation except for the force image creation tenant_conf = { @@ -972,10 +972,10 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): "checkpoint_distance": 10 * 1024, "checkpoint_timeout": "1s", "image_layer_force_creation_period": "1s", - # The lsn for forced image layer creations is calculated once every 10 minutes. - # Hence, drive compaction manually such that the test doesn't compute it at the - # wrong time. - "compaction_period": "0s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", } # consider every tenant large to run the image layer generation check more eagerly @@ -1018,4 +1018,69 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): ) +def test_image_consistent_lsn(neon_env_builder: NeonEnvBuilder): + """ + Test the /v1/tenant//timeline/ endpoint and the computation of image_consistent_lsn + """ + # use large knobs to disable L0 compaction/image creation except for the force image creation + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + "checkpoint_distance": 10 * 1024, + "checkpoint_timeout": "1s", + "image_layer_force_creation_period": "1s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", + } + + neon_env_builder.num_pageservers = 2 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start( + initial_tenant_conf=tenant_conf, + initial_tenant_shard_count=4, + initial_tenant_shard_stripe_size=1, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)") + for v in range(10): + endpoint.safe_psql( + f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False + ) + + response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id) + shards = response["shards"] + for shard in shards: + assert shard["image_consistent_lsn"] is not None + image_consistent_lsn = response["image_consistent_lsn"] + assert image_consistent_lsn is not None + + # do more writes and wait for image_consistent_lsn to advance + for v in range(100): + endpoint.safe_psql( + f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False + ) + + def check_image_consistent_lsn_advanced(): + response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id) + new_image_consistent_lsn = response["image_consistent_lsn"] + shards = response["shards"] + for shard in shards: + print(f"shard {shard['tenant_id']} image_consistent_lsn{shard['image_consistent_lsn']}") + assert new_image_consistent_lsn != image_consistent_lsn + + wait_until(check_image_consistent_lsn_advanced) + + endpoint.stop_and_destroy() + + for ps in env.pageservers: + ps.allowed_errors.append(".*created delta file of size.*larger than double of target.*") + + # END_HADRON