diff --git a/Cargo.lock b/Cargo.lock index d919537818..9f4d537b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4286,6 +4286,7 @@ dependencies = [ "enumset", "fail", "futures", + "hashlink", "hex", "hex-literal", "http-utils", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 79e87eba9b..587f3774d4 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -546,6 +546,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("Falied to parse 'sampling_ratio'")?, + relsize_snapshot_cache_capacity: settings + .remove("relsize snapshot cache capacity") + .map(|x| x.parse::()) + .transpose() + .context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 2618366469..73b6eee554 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -491,6 +491,8 @@ pub struct TenantConfigToml { /// Tenant level performance sampling ratio override. Controls the ratio of get page requests /// that will get perf sampling for the tenant. pub sampling_ratio: Option, + /// Capacity of relsize snapshot cache (used by replicas). + pub relsize_snapshot_cache_capacity: usize, } pub mod defaults { @@ -730,6 +732,7 @@ pub mod tenant_conf_defaults { pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true; pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100; + pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000; } impl Default for TenantConfigToml { @@ -787,6 +790,7 @@ impl Default for TenantConfigToml { gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB, gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, sampling_ratio: None, + relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e9b37c8ca6..ca26286b86 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -630,6 +630,8 @@ pub struct TenantConfigPatch { pub gc_compaction_ratio_percent: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub sampling_ratio: FieldPatch>, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub relsize_snapshot_cache_capacity: FieldPatch, } /// Like [`crate::config::TenantConfigToml`], but preserves the information @@ -759,6 +761,9 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub sampling_ratio: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub relsize_snapshot_cache_capacity: Option, } impl TenantConfig { @@ -804,6 +809,7 @@ impl TenantConfig { mut gc_compaction_initial_threshold_kb, mut gc_compaction_ratio_percent, mut sampling_ratio, + mut relsize_snapshot_cache_capacity, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -905,6 +911,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .apply(&mut gc_compaction_ratio_percent); patch.sampling_ratio.apply(&mut sampling_ratio); + patch + .relsize_snapshot_cache_capacity + .apply(&mut relsize_snapshot_cache_capacity); Ok(Self { checkpoint_distance, @@ -944,6 +953,7 @@ impl TenantConfig { gc_compaction_initial_threshold_kb, gc_compaction_ratio_percent, sampling_ratio, + relsize_snapshot_cache_capacity, }) } @@ -1052,6 +1062,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .unwrap_or(global_conf.gc_compaction_ratio_percent), sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio), + relsize_snapshot_cache_capacity: self + .relsize_snapshot_cache_capacity + .unwrap_or(global_conf.relsize_snapshot_cache_capacity), } } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index b7b3e0eaf1..6a9a5a292a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -30,6 +30,7 @@ crc32c.workspace = true either.workspace = true fail.workspace = true futures.workspace = true +hashlink.workspace = true hex.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index b49021461e..e89baa0bce 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -343,7 +343,7 @@ where // Gather non-relational files from object storage pages. let slru_partitions = self .timeline - .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) + .get_slru_keyspace(Version::at(self.lsn), self.ctx) .await? .partition( self.timeline.get_shard_identity(), @@ -378,7 +378,7 @@ where // Otherwise only include init forks of unlogged relations. let rels = self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; for &rel in rels.iter() { // Send init fork as main fork to provide well formed empty @@ -517,7 +517,7 @@ where async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> { let nblocks = self .timeline - .get_rel_size(src, Version::Lsn(self.lsn), self.ctx) + .get_rel_size(src, Version::at(self.lsn), self.ctx) .await?; // If the relation is empty, create an empty file @@ -577,7 +577,7 @@ where let relmap_img = if has_relmap_file { let img = self .timeline - .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; if img.len() @@ -631,7 +631,7 @@ where if !has_relmap_file && self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await? .is_empty() { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 8e4dbd6c3e..c50f730f41 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy = Lazy::new(| .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy = Lazy::new(|| { register_uint_gauge!( - "pageserver_relsize_cache_entries", - "Number of entries in the relation size cache", + "pageserver_relsize_latest_cache_entries", + "Number of entries in the latest relation size cache", ) .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_HITS: Lazy = Lazy::new(|| { - register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",) - .expect("failed to define a metric") +pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_latest_cache_hits", + "Latest relation size cache hits", + ) + .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_MISSES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_relsize_cache_misses", - "Relation size cache misses", + "pageserver_relsize_latest_cache_misses", + "Relation size latest cache misses", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_relsize_snapshot_cache_entries", + "Number of entries in the pitr relation size cache", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_hits", + "Pitr relation size cache hits", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_misses", + "Relation size snapshot cache misses", ) .expect("failed to define a metric") }); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 83d9191240..e46ba8d3a1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -62,7 +62,7 @@ use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, SmgrOpTimer, TimelineMetrics, }; -use crate::pgdatadir_mapping::Version; +use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, @@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError { struct BatchedGetPageRequest { req: PagestreamGetPageRequest, timer: SmgrOpTimer, - effective_request_lsn: Lsn, + lsn_range: LsnRange, ctx: RequestContext, } @@ -764,12 +764,12 @@ impl BatchedFeMessage { match batching_strategy { PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => { if let Some(last_in_batch) = accum_pages.last() { - if last_in_batch.effective_request_lsn - != this_pages[0].effective_request_lsn + if last_in_batch.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn { trace!( - accum_lsn = %last_in_batch.effective_request_lsn, - this_lsn = %this_pages[0].effective_request_lsn, + accum_lsn = %last_in_batch.lsn_range.effective_lsn, + this_lsn = %this_pages[0].lsn_range.effective_lsn, "stopping batching because LSN changed" ); @@ -784,15 +784,15 @@ impl BatchedFeMessage { let same_page_different_lsn = accum_pages.iter().any(|batched| { batched.req.rel == this_pages[0].req.rel && batched.req.blkno == this_pages[0].req.blkno - && batched.effective_request_lsn - != this_pages[0].effective_request_lsn + && batched.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn }); if same_page_different_lsn { trace!( rel=%this_pages[0].req.rel, blkno=%this_pages[0].req.blkno, - lsn=%this_pages[0].effective_request_lsn, + lsn=%this_pages[0].lsn_range.effective_lsn, "stopping batching because same page was requested at different LSNs" ); @@ -1158,7 +1158,7 @@ impl PageServerHandler { .await?; // We're holding the Handle - let effective_request_lsn = match Self::effective_request_lsn( + let effective_lsn = match Self::effective_request_lsn( &shard, shard.get_last_record_lsn(), req.hdr.request_lsn, @@ -1177,7 +1177,10 @@ impl PageServerHandler { pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, - effective_request_lsn, + lsn_range: LsnRange { + effective_lsn, + request_lsn: req.hdr.request_lsn + }, ctx, }], // The executor grabs the batch when it becomes idle. @@ -2127,7 +2130,14 @@ impl PageServerHandler { .await?; let exists = timeline - .get_rel_exists(req.rel, Version::Lsn(lsn), ctx) + .get_rel_exists( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -2154,7 +2164,14 @@ impl PageServerHandler { .await?; let n_blocks = timeline - .get_rel_size(req.rel, Version::Lsn(lsn), ctx) + .get_rel_size( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { @@ -2181,7 +2198,15 @@ impl PageServerHandler { .await?; let total_blocks = timeline - .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx) + .get_db_size( + DEFAULTTABLESPACE_OID, + req.dbnode, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; @@ -2214,7 +2239,7 @@ impl PageServerHandler { // Ignore error (trace buffer may be full or tracer may have disconnected). _ = page_trace.try_send(PageTraceEvent { key, - effective_lsn: batch.effective_request_lsn, + effective_lsn: batch.lsn_range.effective_lsn, time, }); } @@ -2229,7 +2254,7 @@ impl PageServerHandler { perf_instrument = true; } - req.effective_request_lsn + req.lsn_range.effective_lsn }) .max() .expect("batch is never empty"); @@ -2283,7 +2308,7 @@ impl PageServerHandler { ( &p.req.rel, &p.req.blkno, - p.effective_request_lsn, + p.lsn_range, p.ctx.attached_child(), ) }), diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0f9bfd19a7..c6f3929257 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,7 +43,9 @@ use crate::aux_file; use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder}; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::{ - RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD, + RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS, + RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS, + RELSIZE_SNAPSHOT_CACHE_MISSES, }; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, @@ -90,6 +92,28 @@ pub enum LsnForTimestamp { NoData(Lsn), } +/// Each request to page server contains LSN range: `not_modified_since..request_lsn`. +/// See comments libs/pageserver_api/src/models.rs. +/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`. +/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`. +#[derive(Debug, Clone, Copy, Default)] +pub struct LsnRange { + pub effective_lsn: Lsn, + pub request_lsn: Lsn, +} + +impl LsnRange { + pub fn at(lsn: Lsn) -> LsnRange { + LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + } + } + pub fn is_latest(&self) -> bool { + self.request_lsn == Lsn::MAX + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum CalculateLogicalSizeError { #[error("cancelled")] @@ -202,13 +226,13 @@ impl Timeline { io_concurrency: IoConcurrency, ) -> Result { match version { - Version::Lsn(effective_lsn) => { + Version::LsnRange(lsns) => { let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self .get_rel_page_at_lsn_batched( - pages.iter().map(|(tag, blknum)| { - (tag, blknum, effective_lsn, ctx.attached_child()) - }), + pages + .iter() + .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())), io_concurrency.clone(), ctx, ) @@ -246,7 +270,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: impl ExactSizeIterator, + pages: impl ExactSizeIterator, io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> Vec> { @@ -265,7 +289,7 @@ impl Timeline { let mut req_keyspaces: HashMap = HashMap::with_capacity(pages.len()); - for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() { + for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -274,7 +298,7 @@ impl Timeline { slots_filled += 1; continue; } - + let lsn = lsns.effective_lsn; let nblocks = { let ctx = RequestContextBuilder::from(&ctx) .perf_span(|crnt_perf_span| { @@ -289,7 +313,7 @@ impl Timeline { .attached_child(); match self - .get_rel_size(*tag, Version::Lsn(lsn), &ctx) + .get_rel_size(*tag, Version::LsnRange(lsns), &ctx) .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await { @@ -470,7 +494,7 @@ impl Timeline { )); } - if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(nblocks); } @@ -488,7 +512,7 @@ impl Timeline { let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); - self.update_cached_rel_size(tag, version.get_lsn(), nblocks); + self.update_cached_rel_size(tag, version, nblocks); Ok(nblocks) } @@ -510,7 +534,7 @@ impl Timeline { } // first try to lookup relation in cache - if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(true); } // then check if the database was already initialized. @@ -632,7 +656,7 @@ impl Timeline { ) -> Result { assert!(self.tenant_shard_id.is_shard_zero()); let n_blocks = self - .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) + .get_slru_segment_size(kind, segno, Version::at(lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -867,11 +891,11 @@ impl Timeline { mut f: impl FnMut(TimestampTz) -> ControlFlow, ) -> Result { for segno in self - .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx) + .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx) .await? { let nblocks = self - .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx) + .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -1137,7 +1161,7 @@ impl Timeline { let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self - .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx) + .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx) .await? { if self.cancel.is_cancelled() { @@ -1212,7 +1236,7 @@ impl Timeline { result.add_key(rel_dir_to_key(spcnode, dbnode)); let mut rels: Vec = self - .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx) + .list_rels(spcnode, dbnode, Version::at(lsn), ctx) .await? .into_iter() .collect(); @@ -1329,59 +1353,75 @@ impl Timeline { Ok((dense_keyspace, sparse_keyspace)) } - /// Get cached size of relation if it not updated after specified LSN - pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { - let rel_size_cache = self.rel_size_cache.read().unwrap(); - if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) { - if lsn >= *cached_lsn { - RELSIZE_CACHE_HITS.inc(); - return Some(*nblocks); + /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of + /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size + /// at the particular LSN (snapshot). + pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option { + let lsn = version.get_lsn(); + { + let rel_size_cache = self.rel_size_latest_cache.read().unwrap(); + if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { + if lsn >= *cached_lsn { + RELSIZE_LATEST_CACHE_HITS.inc(); + return Some(*nblocks); + } + RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES.inc(); + { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) { + RELSIZE_SNAPSHOT_CACHE_HITS.inc(); + return Some(*nblock); + } + } + if version.is_latest() { + RELSIZE_LATEST_CACHE_MISSES.inc(); + } else { + RELSIZE_SNAPSHOT_CACHE_MISSES.inc(); + } None } /// Update cached relation size if there is no more recent update - pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - - if lsn < rel_size_cache.complete_as_of { - // Do not cache old values. It's safe to cache the size on read, as long as - // the read was at an LSN since we started the WAL ingestion. Reasoning: we - // never evict values from the cache, so if the relation size changed after - // 'lsn', the new value is already in the cache. - return; - } - - match rel_size_cache.map.entry(tag) { - hash_map::Entry::Occupied(mut entry) => { - let cached_lsn = entry.get_mut(); - if lsn >= cached_lsn.0 { - *cached_lsn = (lsn, nblocks); + pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) { + let lsn = version.get_lsn(); + if version.is_latest() { + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + match rel_size_cache.entry(tag) { + hash_map::Entry::Occupied(mut entry) => { + let cached_lsn = entry.get_mut(); + if lsn >= cached_lsn.0 { + *cached_lsn = (lsn, nblocks); + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((lsn, nblocks)); + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } - hash_map::Entry::Vacant(entry) => { - entry.insert((lsn, nblocks)); - RELSIZE_CACHE_ENTRIES.inc(); + } else { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if rel_size_cache.capacity() != 0 { + rel_size_cache.insert((lsn, tag), nblocks); + RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64); } } } /// Store cached relation size pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() { - RELSIZE_CACHE_ENTRIES.inc(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() { + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } /// Remove cached relation size pub fn remove_cached_rel_size(&self, tag: &RelTag) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.remove(tag).is_some() { - RELSIZE_CACHE_ENTRIES.dec(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.remove(tag).is_some() { + RELSIZE_LATEST_CACHE_ENTRIES.dec(); } } } @@ -1585,7 +1625,10 @@ impl DatadirModification<'_> { // check the cache too. This is because eagerly checking the cache results in // less work overall and 10% better performance. It's more work on cache miss // but cache miss is rare. - if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) { + if let Some(nblocks) = self + .tline + .get_cached_rel_size(&rel, Version::Modified(self)) + { Ok(nblocks) } else if !self .tline @@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats { /// timeline to not miss the latest updates. #[derive(Clone, Copy)] pub enum Version<'a> { - Lsn(Lsn), + LsnRange(LsnRange), Modified(&'a DatadirModification<'a>), } @@ -2679,7 +2722,7 @@ impl Version<'_> { ctx: &RequestContext, ) -> Result { match self { - Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, + Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await, Version::Modified(modification) => modification.get(key, ctx).await, } } @@ -2701,12 +2744,26 @@ impl Version<'_> { } } - fn get_lsn(&self) -> Lsn { + pub fn is_latest(&self) -> bool { match self { - Version::Lsn(lsn) => *lsn, + Version::LsnRange(lsns) => lsns.is_latest(), + Version::Modified(_) => true, + } + } + + pub fn get_lsn(&self) -> Lsn { + match self { + Version::LsnRange(lsns) => lsns.effective_lsn, Version::Modified(modification) => modification.lsn, } } + + pub fn at(lsn: Lsn) -> Self { + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + }) + } } //--- Metadata structs stored in key-value pairs in the repository. diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c92ab47a..da2e56d80a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ pub mod span; pub mod uninit; mod walreceiver; +use hashlink::LruCache; use std::array; use std::cmp::{max, min}; use std::collections::btree_map::Entry; @@ -197,16 +198,6 @@ pub struct TimelineResources { pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } -/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL -/// ingestion considerably, because WAL ingestion needs to check on most records if the record -/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end -/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the -/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`]. -pub(crate) struct RelSizeCache { - pub(crate) complete_as_of: Lsn, - pub(crate) map: HashMap, -} - pub struct Timeline { pub(crate) conf: &'static PageServerConf, tenant_conf: Arc>, @@ -365,7 +356,8 @@ pub struct Timeline { pub walreceiver: Mutex>, /// Relation size cache - pub(crate) rel_size_cache: RwLock, + pub(crate) rel_size_latest_cache: RwLock>, + pub(crate) rel_size_snapshot_cache: Mutex>, download_all_remote_layers_task_info: RwLock>, @@ -2820,6 +2812,13 @@ impl Timeline { self.remote_client.update_config(&new_conf.location); + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity { + if new_capacity != rel_size_cache.capacity() { + rel_size_cache.set_capacity(new_capacity); + } + } + self.metrics .evictions_with_low_residence_duration .write() @@ -2878,6 +2877,14 @@ impl Timeline { ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded); } + let relsize_snapshot_cache_capacity = { + let loaded_tenant_conf = tenant_conf.load(); + loaded_tenant_conf + .tenant_conf + .relsize_snapshot_cache_capacity + .unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity) + }; + Arc::new_cyclic(|myself| { let metrics = Arc::new(TimelineMetrics::new( &tenant_shard_id, @@ -2969,10 +2976,8 @@ impl Timeline { last_image_layer_creation_check_instant: Mutex::new(None), last_received_wal: Mutex::new(None), - rel_size_cache: RwLock::new(RelSizeCache { - complete_as_of: disk_consistent_lsn, - map: HashMap::new(), - }), + rel_size_latest_cache: RwLock::new(HashMap::new()), + rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), download_all_remote_layers_task_info: RwLock::new(None), diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e60c590f87..c7a6655052 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1684,31 +1684,31 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1719,7 +1719,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x20)), + Version::at(Lsn(0x20)), &ctx, io_concurrency.clone() ) @@ -1733,7 +1733,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x30)), + Version::at(Lsn(0x30)), &ctx, io_concurrency.clone() ) @@ -1747,7 +1747,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1760,7 +1760,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1774,7 +1774,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1787,7 +1787,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1800,7 +1800,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1820,7 +1820,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 2 ); @@ -1829,7 +1829,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1842,7 +1842,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1854,7 +1854,7 @@ mod tests { // should still see the truncated block with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1863,7 +1863,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1880,7 +1880,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx) .await?, 0 ); @@ -1893,7 +1893,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx) .await?, 2 ); @@ -1902,7 +1902,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1915,7 +1915,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1932,7 +1932,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, 1501 ); @@ -1942,7 +1942,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blk, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1956,7 +1956,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1500, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1990,13 +1990,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); @@ -2011,7 +2011,7 @@ mod tests { // Check that rel is not visible anymore assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx) .await?, false ); @@ -2029,13 +2029,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, 1 ); @@ -2077,26 +2077,26 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, relsize ); @@ -2110,7 +2110,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(lsn), + Version::at(lsn), &ctx, io_concurrency.clone() ) @@ -2131,7 +2131,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 1 ); @@ -2144,7 +2144,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -2157,7 +2157,7 @@ mod tests { // should still see all blocks with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, relsize ); @@ -2169,7 +2169,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -2193,13 +2193,13 @@ mod tests { assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, relsize ); @@ -2212,7 +2212,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -2250,7 +2250,7 @@ mod tests { assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE + 1 ); @@ -2264,7 +2264,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE ); @@ -2279,7 +2279,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE - 1 ); @@ -2297,7 +2297,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, size as BlockNumber ); diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 3616467c00..3eb6b7193c 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, }, "rel_size_v2_enabled": True, + "relsize_snapshot_cache_capacity": 10000, "gc_compaction_enabled": True, "gc_compaction_verification": False, "gc_compaction_initial_threshold_kb": 1024000, diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index e2a22cc769..c88bc7aace 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -27,8 +27,9 @@ from contextlib import closing import psycopg2 import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup from fixtures.pg_version import PgVersion from fixtures.utils import query_scalar, skip_on_postgres, wait_until @@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): with secondary.cursor() as secondary_cur: secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (n_restarts,) + + +def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + sql = """ +CREATE TABLE CHAR_TBL(f1 char(4)); +CREATE TABLE FLOAT8_TBL(f1 float8); +CREATE TABLE INT2_TBL(f1 int2); +CREATE TABLE INT4_TBL(f1 int4); +CREATE TABLE INT8_TBL(q1 int8, q2 int8); +CREATE TABLE POINT_TBL(f1 point); +CREATE TABLE TEXT_TBL (f1 text); +CREATE TABLE VARCHAR_TBL(f1 varchar(4)); +CREATE TABLE onek (unique1 int4); +CREATE TABLE onek2 AS SELECT * FROM onek; +CREATE TABLE tenk1 (unique1 int4); +CREATE TABLE tenk2 AS SELECT * FROM tenk1; +CREATE TABLE person (name text, age int4,location point); +CREATE TABLE emp (salary int4, manager name) INHERITS (person); +CREATE TABLE student (gpa float8) INHERITS (person); +CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student); +CREATE TABLE road (name text,thepath path); +CREATE TABLE ihighway () INHERITS (road); +CREATE TABLE shighway(surface text) INHERITS (road); +CREATE TABLE BOOLTBL3 (d text, b bool, o int); +CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool); +DROP TABLE BOOLTBL3; +DROP TABLE BOOLTBL4; +CREATE TABLE ceil_floor_round (a numeric); +DROP TABLE ceil_floor_round; +CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8); +DROP TABLE width_bucket_test; +CREATE TABLE num_input_test (n1 numeric); +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +CREATE TABLE snapshot_test (nr integer, snap txid_snapshot); +CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field); +CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field); +TRUNCATE guid1; +DROP TABLE guid1; +DROP TABLE guid2 CASCADE; +CREATE TABLE numrange_test (nr NUMRANGE); +CREATE INDEX numrange_test_btree on numrange_test(nr); +CREATE TABLE numrange_test2(nr numrange); +CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr); +INSERT INTO numrange_test2 VALUES('[, 5)'); +CREATE TABLE textrange_test (tr text); +CREATE INDEX textrange_test_btree on textrange_test(tr); +CREATE TABLE test_range_gist(ir int4range); +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +DROP INDEX test_range_gist_idx; +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +CREATE TABLE test_range_spgist(ir int4range); +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +DROP INDEX test_range_spgist_idx; +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +CREATE TABLE test_range_elem(i int4); +CREATE INDEX test_range_elem_idx on test_range_elem (i); +CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10)); +DROP TABLE test_range_elem; +CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&)); +CREATE TABLE f_test(f text, i int); +CREATE TABLE i8r_array (f1 int, f2 text); +CREATE TYPE arrayrange as range (subtype=int4[]); +CREATE TYPE two_ints as (a int, b int); +DROP TYPE two_ints cascade; +CREATE TABLE text_support_test (t text); +CREATE TABLE TEMP_FLOAT (f1 FLOAT8); +CREATE TABLE TEMP_INT4 (f1 INT4); +CREATE TABLE TEMP_INT2 (f1 INT2); +CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8); +CREATE TABLE POLYGON_TBL(f1 polygon); +CREATE TABLE quad_poly_tbl (id int, p polygon); +INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y; +CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl; +CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl; +""" + + with endpoint.cursor() as cur: + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + for line in sql.split("\n"): + if len(line.strip()) == 0 or line.startswith("--"): + continue + cur.execute(line) + + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + cur.execute("VACUUM FULL pg_class;") + + for ep in env.endpoints.endpoints: + log.info(f"{ep.endpoint_id} / {ep.pg_port}") + pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"] + env_vars = { + "PGPORT": str(ep.pg_port), + "PGUSER": endpoint.default_options["user"], + "PGHOST": endpoint.default_options["host"], + } + pg_bin.run_capture(pg_dump_command, env=env_vars)