diff --git a/Cargo.lock b/Cargo.lock index d919537818..9f4d537b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4286,6 +4286,7 @@ dependencies = [ "enumset", "fail", "futures", + "hashlink", "hex", "hex-literal", "http-utils", diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 10d8f2c878..94467a0d2f 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -213,8 +213,10 @@ impl Escaping for PgIdent { // Find the first suitable tag that is not present in the string. // Postgres' max role/DB name length is 63 bytes, so even in the - // worst case it won't take long. - while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) { + // worst case it won't take long. Outer tag is always `tag + "x"`, + // so if `tag` is not present in the string, `outer_tag` is not + // present in the string either. + while self.contains(&tag.to_string()) { tag += "x"; outer_tag = tag.clone() + "x"; } diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index 53f2ddad84..04b6ed2256 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor ("name$$$", ("$x$name$$$$x$", "xx")), ("name$$$$", ("$x$name$$$$$x$", "xx")), ("name$x$", ("$xx$name$x$$xx$", "xxx")), + ("x", ("$xx$x$xx$", "xxx")), + ("xx", ("$xxx$xx$xxx$", "xxxx")), + ("$x", ("$xx$$x$xx$", "xxx")), + ("x$", ("$xx$x$$xx$", "xxx")), + ("$x$", ("$xx$$x$$xx$", "xxx")), + ("xx$", ("$xxx$xx$$xxx$", "xxxx")), + ("$xx", ("$xxx$$xx$xxx$", "xxxx")), + ("$xx$", ("$xxx$$xx$$xxx$", "xxxx")), ]; for (input, expected) in test_cases { diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 79e87eba9b..587f3774d4 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -546,6 +546,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("Falied to parse 'sampling_ratio'")?, + relsize_snapshot_cache_capacity: settings + .remove("relsize snapshot cache capacity") + .map(|x| x.parse::()) + .transpose() + .context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 2618366469..73b6eee554 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -491,6 +491,8 @@ pub struct TenantConfigToml { /// Tenant level performance sampling ratio override. Controls the ratio of get page requests /// that will get perf sampling for the tenant. pub sampling_ratio: Option, + /// Capacity of relsize snapshot cache (used by replicas). + pub relsize_snapshot_cache_capacity: usize, } pub mod defaults { @@ -730,6 +732,7 @@ pub mod tenant_conf_defaults { pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true; pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100; + pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000; } impl Default for TenantConfigToml { @@ -787,6 +790,7 @@ impl Default for TenantConfigToml { gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB, gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, sampling_ratio: None, + relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e9b37c8ca6..ca26286b86 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -630,6 +630,8 @@ pub struct TenantConfigPatch { pub gc_compaction_ratio_percent: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub sampling_ratio: FieldPatch>, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub relsize_snapshot_cache_capacity: FieldPatch, } /// Like [`crate::config::TenantConfigToml`], but preserves the information @@ -759,6 +761,9 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub sampling_ratio: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub relsize_snapshot_cache_capacity: Option, } impl TenantConfig { @@ -804,6 +809,7 @@ impl TenantConfig { mut gc_compaction_initial_threshold_kb, mut gc_compaction_ratio_percent, mut sampling_ratio, + mut relsize_snapshot_cache_capacity, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -905,6 +911,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .apply(&mut gc_compaction_ratio_percent); patch.sampling_ratio.apply(&mut sampling_ratio); + patch + .relsize_snapshot_cache_capacity + .apply(&mut relsize_snapshot_cache_capacity); Ok(Self { checkpoint_distance, @@ -944,6 +953,7 @@ impl TenantConfig { gc_compaction_initial_threshold_kb, gc_compaction_ratio_percent, sampling_ratio, + relsize_snapshot_cache_capacity, }) } @@ -1052,6 +1062,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .unwrap_or(global_conf.gc_compaction_ratio_percent), sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio), + relsize_snapshot_cache_capacity: self + .relsize_snapshot_cache_capacity + .unwrap_or(global_conf.relsize_snapshot_cache_capacity), } } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index b7b3e0eaf1..6a9a5a292a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -30,6 +30,7 @@ crc32c.workspace = true either.workspace = true fail.workspace = true futures.workspace = true +hashlink.workspace = true hex.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index b49021461e..e89baa0bce 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -343,7 +343,7 @@ where // Gather non-relational files from object storage pages. let slru_partitions = self .timeline - .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) + .get_slru_keyspace(Version::at(self.lsn), self.ctx) .await? .partition( self.timeline.get_shard_identity(), @@ -378,7 +378,7 @@ where // Otherwise only include init forks of unlogged relations. let rels = self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; for &rel in rels.iter() { // Send init fork as main fork to provide well formed empty @@ -517,7 +517,7 @@ where async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> { let nblocks = self .timeline - .get_rel_size(src, Version::Lsn(self.lsn), self.ctx) + .get_rel_size(src, Version::at(self.lsn), self.ctx) .await?; // If the relation is empty, create an empty file @@ -577,7 +577,7 @@ where let relmap_img = if has_relmap_file { let img = self .timeline - .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; if img.len() @@ -631,7 +631,7 @@ where if !has_relmap_file && self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await? .is_empty() { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 8e4dbd6c3e..c50f730f41 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy = Lazy::new(| .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy = Lazy::new(|| { register_uint_gauge!( - "pageserver_relsize_cache_entries", - "Number of entries in the relation size cache", + "pageserver_relsize_latest_cache_entries", + "Number of entries in the latest relation size cache", ) .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_HITS: Lazy = Lazy::new(|| { - register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",) - .expect("failed to define a metric") +pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_latest_cache_hits", + "Latest relation size cache hits", + ) + .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_MISSES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_relsize_cache_misses", - "Relation size cache misses", + "pageserver_relsize_latest_cache_misses", + "Relation size latest cache misses", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_relsize_snapshot_cache_entries", + "Number of entries in the pitr relation size cache", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_hits", + "Pitr relation size cache hits", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_misses", + "Relation size snapshot cache misses", ) .expect("failed to define a metric") }); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 83d9191240..e46ba8d3a1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -62,7 +62,7 @@ use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, SmgrOpTimer, TimelineMetrics, }; -use crate::pgdatadir_mapping::Version; +use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, @@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError { struct BatchedGetPageRequest { req: PagestreamGetPageRequest, timer: SmgrOpTimer, - effective_request_lsn: Lsn, + lsn_range: LsnRange, ctx: RequestContext, } @@ -764,12 +764,12 @@ impl BatchedFeMessage { match batching_strategy { PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => { if let Some(last_in_batch) = accum_pages.last() { - if last_in_batch.effective_request_lsn - != this_pages[0].effective_request_lsn + if last_in_batch.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn { trace!( - accum_lsn = %last_in_batch.effective_request_lsn, - this_lsn = %this_pages[0].effective_request_lsn, + accum_lsn = %last_in_batch.lsn_range.effective_lsn, + this_lsn = %this_pages[0].lsn_range.effective_lsn, "stopping batching because LSN changed" ); @@ -784,15 +784,15 @@ impl BatchedFeMessage { let same_page_different_lsn = accum_pages.iter().any(|batched| { batched.req.rel == this_pages[0].req.rel && batched.req.blkno == this_pages[0].req.blkno - && batched.effective_request_lsn - != this_pages[0].effective_request_lsn + && batched.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn }); if same_page_different_lsn { trace!( rel=%this_pages[0].req.rel, blkno=%this_pages[0].req.blkno, - lsn=%this_pages[0].effective_request_lsn, + lsn=%this_pages[0].lsn_range.effective_lsn, "stopping batching because same page was requested at different LSNs" ); @@ -1158,7 +1158,7 @@ impl PageServerHandler { .await?; // We're holding the Handle - let effective_request_lsn = match Self::effective_request_lsn( + let effective_lsn = match Self::effective_request_lsn( &shard, shard.get_last_record_lsn(), req.hdr.request_lsn, @@ -1177,7 +1177,10 @@ impl PageServerHandler { pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, - effective_request_lsn, + lsn_range: LsnRange { + effective_lsn, + request_lsn: req.hdr.request_lsn + }, ctx, }], // The executor grabs the batch when it becomes idle. @@ -2127,7 +2130,14 @@ impl PageServerHandler { .await?; let exists = timeline - .get_rel_exists(req.rel, Version::Lsn(lsn), ctx) + .get_rel_exists( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -2154,7 +2164,14 @@ impl PageServerHandler { .await?; let n_blocks = timeline - .get_rel_size(req.rel, Version::Lsn(lsn), ctx) + .get_rel_size( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { @@ -2181,7 +2198,15 @@ impl PageServerHandler { .await?; let total_blocks = timeline - .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx) + .get_db_size( + DEFAULTTABLESPACE_OID, + req.dbnode, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; @@ -2214,7 +2239,7 @@ impl PageServerHandler { // Ignore error (trace buffer may be full or tracer may have disconnected). _ = page_trace.try_send(PageTraceEvent { key, - effective_lsn: batch.effective_request_lsn, + effective_lsn: batch.lsn_range.effective_lsn, time, }); } @@ -2229,7 +2254,7 @@ impl PageServerHandler { perf_instrument = true; } - req.effective_request_lsn + req.lsn_range.effective_lsn }) .max() .expect("batch is never empty"); @@ -2283,7 +2308,7 @@ impl PageServerHandler { ( &p.req.rel, &p.req.blkno, - p.effective_request_lsn, + p.lsn_range, p.ctx.attached_child(), ) }), diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0f9bfd19a7..c6f3929257 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,7 +43,9 @@ use crate::aux_file; use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder}; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::{ - RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD, + RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS, + RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS, + RELSIZE_SNAPSHOT_CACHE_MISSES, }; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, @@ -90,6 +92,28 @@ pub enum LsnForTimestamp { NoData(Lsn), } +/// Each request to page server contains LSN range: `not_modified_since..request_lsn`. +/// See comments libs/pageserver_api/src/models.rs. +/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`. +/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`. +#[derive(Debug, Clone, Copy, Default)] +pub struct LsnRange { + pub effective_lsn: Lsn, + pub request_lsn: Lsn, +} + +impl LsnRange { + pub fn at(lsn: Lsn) -> LsnRange { + LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + } + } + pub fn is_latest(&self) -> bool { + self.request_lsn == Lsn::MAX + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum CalculateLogicalSizeError { #[error("cancelled")] @@ -202,13 +226,13 @@ impl Timeline { io_concurrency: IoConcurrency, ) -> Result { match version { - Version::Lsn(effective_lsn) => { + Version::LsnRange(lsns) => { let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self .get_rel_page_at_lsn_batched( - pages.iter().map(|(tag, blknum)| { - (tag, blknum, effective_lsn, ctx.attached_child()) - }), + pages + .iter() + .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())), io_concurrency.clone(), ctx, ) @@ -246,7 +270,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: impl ExactSizeIterator, + pages: impl ExactSizeIterator, io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> Vec> { @@ -265,7 +289,7 @@ impl Timeline { let mut req_keyspaces: HashMap = HashMap::with_capacity(pages.len()); - for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() { + for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -274,7 +298,7 @@ impl Timeline { slots_filled += 1; continue; } - + let lsn = lsns.effective_lsn; let nblocks = { let ctx = RequestContextBuilder::from(&ctx) .perf_span(|crnt_perf_span| { @@ -289,7 +313,7 @@ impl Timeline { .attached_child(); match self - .get_rel_size(*tag, Version::Lsn(lsn), &ctx) + .get_rel_size(*tag, Version::LsnRange(lsns), &ctx) .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await { @@ -470,7 +494,7 @@ impl Timeline { )); } - if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(nblocks); } @@ -488,7 +512,7 @@ impl Timeline { let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); - self.update_cached_rel_size(tag, version.get_lsn(), nblocks); + self.update_cached_rel_size(tag, version, nblocks); Ok(nblocks) } @@ -510,7 +534,7 @@ impl Timeline { } // first try to lookup relation in cache - if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(true); } // then check if the database was already initialized. @@ -632,7 +656,7 @@ impl Timeline { ) -> Result { assert!(self.tenant_shard_id.is_shard_zero()); let n_blocks = self - .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) + .get_slru_segment_size(kind, segno, Version::at(lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -867,11 +891,11 @@ impl Timeline { mut f: impl FnMut(TimestampTz) -> ControlFlow, ) -> Result { for segno in self - .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx) + .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx) .await? { let nblocks = self - .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx) + .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -1137,7 +1161,7 @@ impl Timeline { let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self - .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx) + .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx) .await? { if self.cancel.is_cancelled() { @@ -1212,7 +1236,7 @@ impl Timeline { result.add_key(rel_dir_to_key(spcnode, dbnode)); let mut rels: Vec = self - .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx) + .list_rels(spcnode, dbnode, Version::at(lsn), ctx) .await? .into_iter() .collect(); @@ -1329,59 +1353,75 @@ impl Timeline { Ok((dense_keyspace, sparse_keyspace)) } - /// Get cached size of relation if it not updated after specified LSN - pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { - let rel_size_cache = self.rel_size_cache.read().unwrap(); - if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) { - if lsn >= *cached_lsn { - RELSIZE_CACHE_HITS.inc(); - return Some(*nblocks); + /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of + /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size + /// at the particular LSN (snapshot). + pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option { + let lsn = version.get_lsn(); + { + let rel_size_cache = self.rel_size_latest_cache.read().unwrap(); + if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { + if lsn >= *cached_lsn { + RELSIZE_LATEST_CACHE_HITS.inc(); + return Some(*nblocks); + } + RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES.inc(); + { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) { + RELSIZE_SNAPSHOT_CACHE_HITS.inc(); + return Some(*nblock); + } + } + if version.is_latest() { + RELSIZE_LATEST_CACHE_MISSES.inc(); + } else { + RELSIZE_SNAPSHOT_CACHE_MISSES.inc(); + } None } /// Update cached relation size if there is no more recent update - pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - - if lsn < rel_size_cache.complete_as_of { - // Do not cache old values. It's safe to cache the size on read, as long as - // the read was at an LSN since we started the WAL ingestion. Reasoning: we - // never evict values from the cache, so if the relation size changed after - // 'lsn', the new value is already in the cache. - return; - } - - match rel_size_cache.map.entry(tag) { - hash_map::Entry::Occupied(mut entry) => { - let cached_lsn = entry.get_mut(); - if lsn >= cached_lsn.0 { - *cached_lsn = (lsn, nblocks); + pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) { + let lsn = version.get_lsn(); + if version.is_latest() { + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + match rel_size_cache.entry(tag) { + hash_map::Entry::Occupied(mut entry) => { + let cached_lsn = entry.get_mut(); + if lsn >= cached_lsn.0 { + *cached_lsn = (lsn, nblocks); + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((lsn, nblocks)); + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } - hash_map::Entry::Vacant(entry) => { - entry.insert((lsn, nblocks)); - RELSIZE_CACHE_ENTRIES.inc(); + } else { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if rel_size_cache.capacity() != 0 { + rel_size_cache.insert((lsn, tag), nblocks); + RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64); } } } /// Store cached relation size pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() { - RELSIZE_CACHE_ENTRIES.inc(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() { + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } /// Remove cached relation size pub fn remove_cached_rel_size(&self, tag: &RelTag) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.remove(tag).is_some() { - RELSIZE_CACHE_ENTRIES.dec(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.remove(tag).is_some() { + RELSIZE_LATEST_CACHE_ENTRIES.dec(); } } } @@ -1585,7 +1625,10 @@ impl DatadirModification<'_> { // check the cache too. This is because eagerly checking the cache results in // less work overall and 10% better performance. It's more work on cache miss // but cache miss is rare. - if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) { + if let Some(nblocks) = self + .tline + .get_cached_rel_size(&rel, Version::Modified(self)) + { Ok(nblocks) } else if !self .tline @@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats { /// timeline to not miss the latest updates. #[derive(Clone, Copy)] pub enum Version<'a> { - Lsn(Lsn), + LsnRange(LsnRange), Modified(&'a DatadirModification<'a>), } @@ -2679,7 +2722,7 @@ impl Version<'_> { ctx: &RequestContext, ) -> Result { match self { - Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, + Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await, Version::Modified(modification) => modification.get(key, ctx).await, } } @@ -2701,12 +2744,26 @@ impl Version<'_> { } } - fn get_lsn(&self) -> Lsn { + pub fn is_latest(&self) -> bool { match self { - Version::Lsn(lsn) => *lsn, + Version::LsnRange(lsns) => lsns.is_latest(), + Version::Modified(_) => true, + } + } + + pub fn get_lsn(&self) -> Lsn { + match self { + Version::LsnRange(lsns) => lsns.effective_lsn, Version::Modified(modification) => modification.lsn, } } + + pub fn at(lsn: Lsn) -> Self { + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + }) + } } //--- Metadata structs stored in key-value pairs in the repository. diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c92ab47a..da2e56d80a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ pub mod span; pub mod uninit; mod walreceiver; +use hashlink::LruCache; use std::array; use std::cmp::{max, min}; use std::collections::btree_map::Entry; @@ -197,16 +198,6 @@ pub struct TimelineResources { pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } -/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL -/// ingestion considerably, because WAL ingestion needs to check on most records if the record -/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end -/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the -/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`]. -pub(crate) struct RelSizeCache { - pub(crate) complete_as_of: Lsn, - pub(crate) map: HashMap, -} - pub struct Timeline { pub(crate) conf: &'static PageServerConf, tenant_conf: Arc>, @@ -365,7 +356,8 @@ pub struct Timeline { pub walreceiver: Mutex>, /// Relation size cache - pub(crate) rel_size_cache: RwLock, + pub(crate) rel_size_latest_cache: RwLock>, + pub(crate) rel_size_snapshot_cache: Mutex>, download_all_remote_layers_task_info: RwLock>, @@ -2820,6 +2812,13 @@ impl Timeline { self.remote_client.update_config(&new_conf.location); + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity { + if new_capacity != rel_size_cache.capacity() { + rel_size_cache.set_capacity(new_capacity); + } + } + self.metrics .evictions_with_low_residence_duration .write() @@ -2878,6 +2877,14 @@ impl Timeline { ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded); } + let relsize_snapshot_cache_capacity = { + let loaded_tenant_conf = tenant_conf.load(); + loaded_tenant_conf + .tenant_conf + .relsize_snapshot_cache_capacity + .unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity) + }; + Arc::new_cyclic(|myself| { let metrics = Arc::new(TimelineMetrics::new( &tenant_shard_id, @@ -2969,10 +2976,8 @@ impl Timeline { last_image_layer_creation_check_instant: Mutex::new(None), last_received_wal: Mutex::new(None), - rel_size_cache: RwLock::new(RelSizeCache { - complete_as_of: disk_consistent_lsn, - map: HashMap::new(), - }), + rel_size_latest_cache: RwLock::new(HashMap::new()), + rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), download_all_remote_layers_task_info: RwLock::new(None), diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e60c590f87..c7a6655052 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1684,31 +1684,31 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1719,7 +1719,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x20)), + Version::at(Lsn(0x20)), &ctx, io_concurrency.clone() ) @@ -1733,7 +1733,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x30)), + Version::at(Lsn(0x30)), &ctx, io_concurrency.clone() ) @@ -1747,7 +1747,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1760,7 +1760,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1774,7 +1774,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1787,7 +1787,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1800,7 +1800,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1820,7 +1820,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 2 ); @@ -1829,7 +1829,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1842,7 +1842,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1854,7 +1854,7 @@ mod tests { // should still see the truncated block with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1863,7 +1863,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1880,7 +1880,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx) .await?, 0 ); @@ -1893,7 +1893,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx) .await?, 2 ); @@ -1902,7 +1902,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1915,7 +1915,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1932,7 +1932,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, 1501 ); @@ -1942,7 +1942,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blk, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1956,7 +1956,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1500, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1990,13 +1990,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); @@ -2011,7 +2011,7 @@ mod tests { // Check that rel is not visible anymore assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx) .await?, false ); @@ -2029,13 +2029,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, 1 ); @@ -2077,26 +2077,26 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, relsize ); @@ -2110,7 +2110,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(lsn), + Version::at(lsn), &ctx, io_concurrency.clone() ) @@ -2131,7 +2131,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 1 ); @@ -2144,7 +2144,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -2157,7 +2157,7 @@ mod tests { // should still see all blocks with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, relsize ); @@ -2169,7 +2169,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -2193,13 +2193,13 @@ mod tests { assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, relsize ); @@ -2212,7 +2212,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -2250,7 +2250,7 @@ mod tests { assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE + 1 ); @@ -2264,7 +2264,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE ); @@ -2279,7 +2279,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE - 1 ); @@ -2297,7 +2297,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, size as BlockNumber ); diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index efa3c0b514..a58b55a704 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,13 +1,11 @@ -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; -use std::hash::BuildHasher; +use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; -use std::{array, env, fmt, io}; +use std::{env, io}; use chrono::{DateTime, Utc}; -use indexmap::IndexSet; use opentelemetry::trace::TraceContextExt; -use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; @@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; -use try_lock::TryLock; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result { StderrWriter { stderr: std::io::stderr(), }, - ["request_id", "session_id", "conn_id"], + &["request_id", "session_id", "conn_id"], )) } else { None @@ -183,50 +180,65 @@ impl Clock for RealClock { /// Name of the field used by tracing crate to store the event message. const MESSAGE_FIELD: &str = "message"; +/// Tracing used to enforce that spans/events have no more than 32 fields. +/// It seems this is no longer the case, but it's still documented in some places. +/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and +/// rely on it for some (minor) performance gains. +const MAX_TRACING_FIELDS: usize = 32; + thread_local! { - /// Protects against deadlocks and double panics during log writing. - /// The current panic handler will use tracing to log panic information. - static REENTRANCY_GUARD: Cell = const { Cell::new(false) }; /// Thread-local instance with per-thread buffer for log writing. - static EVENT_FORMATTER: RefCell = RefCell::new(EventFormatter::new()); + static EVENT_FORMATTER: RefCell = const { RefCell::new(EventFormatter::new()) }; /// Cached OS thread ID. static THREAD_ID: u64 = gettid::gettid(); } +/// Map for values fixed at callsite registration. +// We use papaya here because registration rarely happens post-startup. +// papaya is good for read-heavy workloads. +// +// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy, +// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy. +type CallsiteMap = + papaya::HashMap>; + /// Implements tracing layer to handle events specific to logging. -struct JsonLoggingLayer { +struct JsonLoggingLayer { clock: C, - skipped_field_indices: papaya::HashMap, - callsite_ids: papaya::HashMap, writer: W, - // We use a const generic and arrays to bypass one heap allocation. - extract_fields: IndexSet<&'static str>, - _marker: std::marker::PhantomData<[&'static str; F]>, + + /// tracks which fields of each **event** are duplicates + skipped_field_indices: CallsiteMap, + + span_info: CallsiteMap, + + /// Fields we want to keep track of in a separate json object. + extract_fields: &'static [&'static str], } -impl JsonLoggingLayer { - fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self { +impl JsonLoggingLayer { + fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self { JsonLoggingLayer { clock, - skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + skipped_field_indices: CallsiteMap::default(), + span_info: CallsiteMap::default(), writer, - extract_fields: IndexSet::from_iter(extract_fields), - _marker: std::marker::PhantomData, + extract_fields, } } #[inline] - fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId { - *self - .callsite_ids + fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo { + self.span_info .pin() - .get_or_insert_with(cs, CallsiteId::next) + .get_or_insert_with(metadata.callsite(), || { + CallsiteSpanInfo::new(metadata, self.extract_fields) + }) + .clone() } } -impl Layer - for JsonLoggingLayer +impl Layer for JsonLoggingLayer where S: Subscriber + for<'a> LookupSpan<'a>, { @@ -237,35 +249,25 @@ where // early, before OTel machinery, and add as event extension. let now = self.clock.now(); - let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| { - if entered.get() { - let mut formatter = EventFormatter::new(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - } else { - entered.set(true); - defer!(entered.set(false);); + let res: io::Result<()> = EVENT_FORMATTER.with(|f| { + let mut borrow = f.try_borrow_mut(); + let formatter = match borrow.as_deref_mut() { + Ok(formatter) => formatter, + // If the thread local formatter is borrowed, + // then we likely hit an edge case were we panicked during formatting. + // We allow the logging to proceed with an uncached formatter. + Err(_) => &mut EventFormatter::new(), + }; - EVENT_FORMATTER.with_borrow_mut(move |formatter| { - formatter.reset(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - }) - } + formatter.reset(); + formatter.format( + now, + event, + &ctx, + &self.skipped_field_indices, + self.extract_fields, + )?; + self.writer.make_writer().write_all(formatter.buffer()) }); // In case logging fails we generate a simpler JSON object. @@ -287,50 +289,48 @@ where /// Registers a SpanFields instance as span extension. fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let fields = SpanFields::default(); - fields.record_fields(attrs); - // This could deadlock when there's a panic somewhere in the tracing - // event handling and a read or write guard is still held. This includes - // the OTel subscriber. - let mut exts = span.extensions_mut(); + let mut fields = SpanFields::new(self.span_info(span.metadata())); + attrs.record(&mut fields); - exts.insert(fields); + // This is a new span: the extensions should not be locked + // unless some layer spawned a thread to process this span. + // I don't think any layers do that. + span.extensions_mut().insert(fields); } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let ext = span.extensions(); - if let Some(data) = ext.get::() { - data.record_fields(values); + + // assumption: `on_record` is rarely called. + // assumption: a span being updated by one thread, + // and formatted by another thread is even rarer. + let mut ext = span.extensions_mut(); + if let Some(fields) = ext.get_mut::() { + values.record(fields); } } - /// Called (lazily) whenever a new log call is executed. We quickly check - /// for duplicate field names and record duplicates as skippable. Last one - /// wins. + /// Called (lazily) roughly once per event/span instance. We quickly check + /// for duplicate field names and record duplicates as skippable. Last field wins. fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + debug_assert!( + metadata.fields().len() <= MAX_TRACING_FIELDS, + "callsite {metadata:?} has too many fields." + ); + if !metadata.is_event() { - self.callsite_id(metadata.callsite()); + // register the span info. + self.span_info(metadata); // Must not be never because we wouldn't get trace and span data. return Interest::always(); } let mut field_indices = SkippedFieldIndices::default(); - let mut seen_fields = HashMap::<&'static str, usize>::new(); + let mut seen_fields = HashMap::new(); for field in metadata.fields() { - use std::collections::hash_map::Entry; - match seen_fields.entry(field.name()) { - Entry::Vacant(entry) => { - // field not seen yet - entry.insert(field.index()); - } - Entry::Occupied(mut entry) => { - // replace currently stored index - let old_index = entry.insert(field.index()); - // ... and append it to list of skippable indices - field_indices.push(old_index); - } + if let Some(old_index) = seen_fields.insert(field.name(), field.index()) { + field_indices.set(old_index); } } @@ -344,110 +344,113 @@ where } } -#[derive(Copy, Clone, Debug, Default)] -#[repr(transparent)] -struct CallsiteId(u32); +/// Any span info that is fixed to a particular callsite. Not variable between span instances. +#[derive(Clone)] +struct CallsiteSpanInfo { + /// index of each field to extract. usize::MAX if not found. + extract: Arc<[usize]>, -impl CallsiteId { - #[inline] - fn next() -> Self { - // Start at 1 to reserve 0 for default. - static COUNTER: AtomicU32 = AtomicU32::new(1); - CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed)) - } + /// tracks the fixed "callsite ID" for each span. + /// note: this is not stable between runs. + normalized_name: Arc, } -impl fmt::Display for CallsiteId { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl CallsiteSpanInfo { + fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self { + // Start at 1 to reserve 0 for default. + static COUNTER: AtomicU32 = AtomicU32::new(1); + + let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect(); + + // get all the indices of span fields we want to focus + let extract = extract_fields + .iter() + // use rposition, since we want last match wins. + .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX)) + .collect(); + + // normalized_name is unique for each callsite, but it is not + // unified across separate proxy instances. + // todo: can we do better here? + let cid = COUNTER.fetch_add(1, Ordering::Relaxed); + let normalized_name = format!("{}#{cid}", metadata.name()).into(); + + Self { + extract, + normalized_name, + } } } /// Stores span field values recorded during the spans lifetime. -#[derive(Default)] struct SpanFields { - // TODO: Switch to custom enum with lasso::Spur for Strings? - fields: papaya::HashMap<&'static str, serde_json::Value>, + values: [serde_json::Value; MAX_TRACING_FIELDS], + + /// cached span info so we can avoid extra hashmap lookups in the hot path. + span_info: CallsiteSpanInfo, } impl SpanFields { - #[inline] - fn record_fields(&self, fields: R) { - fields.record(&mut SpanFieldsRecorder { - fields: self.fields.pin(), - }); + fn new(span_info: CallsiteSpanInfo) -> Self { + Self { + span_info, + values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS], + } } } -/// Implements a tracing field visitor to convert and store values. -struct SpanFieldsRecorder<'m, S, G> { - fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>, -} - -impl tracing::field::Visit for SpanFieldsRecorder<'_, S, G> { +impl tracing::field::Visit for SpanFields { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { if let Ok(value) = i64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { if let Ok(value) = u64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value:?}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value:?}")); } #[inline] @@ -456,38 +459,33 @@ impl tracing::field::Visit for SpanFieldsRecor field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } /// List of field indices skipped during logging. Can list duplicate fields or /// metafields not meant to be logged. -#[derive(Clone, Default)] +#[derive(Copy, Clone, Default)] struct SkippedFieldIndices { - bits: u64, + // 32-bits is large enough for `MAX_TRACING_FIELDS` + bits: u32, } impl SkippedFieldIndices { #[inline] - fn is_empty(&self) -> bool { + fn is_empty(self) -> bool { self.bits == 0 } #[inline] - fn push(&mut self, index: usize) { - self.bits |= 1u64 - .checked_shl(index as u32) - .expect("field index too large"); + fn set(&mut self, index: usize) { + debug_assert!(index <= 32, "index out of bounds of 32-bit set"); + self.bits |= 1 << index; } #[inline] - fn contains(&self, index: usize) -> bool { - self.bits - & 1u64 - .checked_shl(index as u32) - .expect("field index too large") - != 0 + fn contains(self, index: usize) -> bool { + self.bits & (1 << index) != 0 } } @@ -499,7 +497,7 @@ struct EventFormatter { impl EventFormatter { #[inline] - fn new() -> Self { + const fn new() -> Self { EventFormatter { logline_buffer: Vec::new(), } @@ -515,14 +513,13 @@ impl EventFormatter { self.logline_buffer.clear(); } - fn format( + fn format( &mut self, now: DateTime, event: &Event<'_>, ctx: &Context<'_, S>, - skipped_field_indices: &papaya::HashMap, - callsite_ids: &papaya::HashMap, - extract_fields: &IndexSet<&'static str>, + skipped_field_indices: &CallsiteMap, + extract_fields: &'static [&'static str], ) -> io::Result<()> where S: Subscriber + for<'a> LookupSpan<'a>, @@ -533,8 +530,11 @@ impl EventFormatter { let normalized_meta = event.normalized_metadata(); let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); - let skipped_field_indices = skipped_field_indices.pin(); - let skipped_field_indices = skipped_field_indices.get(&meta.callsite()); + let skipped_field_indices = skipped_field_indices + .pin() + .get(&meta.callsite()) + .copied() + .unwrap_or_default(); let mut serialize = || { let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); @@ -565,9 +565,11 @@ impl EventFormatter { } let spans = SerializableSpans { - ctx, - callsite_ids, - extract: ExtractedSpanFields::<'_, F>::new(extract_fields), + // collect all spans from parent to root. + spans: ctx + .event_span(event) + .map_or(vec![], |parent| parent.scope().collect()), + extracted: ExtractedSpanFields::new(extract_fields), }; serializer.serialize_entry("spans", &spans)?; @@ -620,9 +622,9 @@ impl EventFormatter { } } - if spans.extract.has_values() { + if spans.extracted.has_values() { // TODO: add fields from event, too? - serializer.serialize_entry("extract", &spans.extract)?; + serializer.serialize_entry("extract", &spans.extracted)?; } serializer.end() @@ -635,15 +637,15 @@ impl EventFormatter { } /// Extracts the message field that's mixed will other fields. -struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> { +struct MessageFieldExtractor { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Option>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { +impl MessageFieldExtractor { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { fn accept_field(&self, field: &tracing::field::Field) -> bool { self.state.is_none() && field.name() == MESSAGE_FIELD - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } } -impl tracing::field::Visit for MessageFieldExtractor<'_, S> { +impl tracing::field::Visit for MessageFieldExtractor { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -751,14 +751,14 @@ impl tracing::field::Visit for MessageFieldExtracto /// can be skipped. // This is entirely optional and only cosmetic, though maybe helps a // bit during log parsing in dashboards when there's no field with empty object. -struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>); +struct FieldsPresent(pub bool, SkippedFieldIndices); // Even though some methods have an overhead (error, bytes) it is assumed the // compiler won't include this since we ignore the value entirely. -impl tracing::field::Visit for FieldsPresent<'_> { +impl tracing::field::Visit for FieldsPresent { #[inline] fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) { - if !self.1.is_some_and(|i| i.contains(field.index())) + if !self.1.contains(field.index()) && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") { @@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> { } /// Serializes the fields directly supplied with a log event. -struct SerializableEventFields<'a, 'event>( - &'a tracing::Event<'event>, - Option<&'a SkippedFieldIndices>, -); +struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices); impl serde::ser::Serialize for SerializableEventFields<'_, '_> { fn serialize(&self, serializer: S) -> Result @@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> { } /// A tracing field visitor that skips the message field. -struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> { +struct MessageFieldSkipper { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Result<(), S::Error>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { +impl MessageFieldSkipper { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { self.state.is_ok() && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } #[inline] @@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { } } -impl tracing::field::Visit for MessageFieldSkipper<'_, S> { +impl tracing::field::Visit for MessageFieldSkipper { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -905,18 +900,17 @@ impl tracing::field::Visit for MessageFieldSkipper< /// with the span names as keys. To prevent collision we append a numberic value /// to the name. Also, collects any span fields we're interested in. Last one /// wins. -struct SerializableSpans<'a, 'ctx, Span, const F: usize> +struct SerializableSpans<'ctx, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { - ctx: &'a Context<'ctx, Span>, - callsite_ids: &'a papaya::HashMap, - extract: ExtractedSpanFields<'a, F>, + spans: Vec>, + extracted: ExtractedSpanFields, } -impl serde::ser::Serialize for SerializableSpans<'_, '_, Span, F> +impl serde::ser::Serialize for SerializableSpans<'_, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { fn serialize(&self, serializer: Ser) -> Result where @@ -924,25 +918,22 @@ where { let mut serializer = serializer.serialize_map(None)?; - if let Some(leaf_span) = self.ctx.lookup_current() { - for span in leaf_span.scope().from_root() { - // Append a numeric callsite ID to the span name to keep the name unique - // in the JSON object. - let cid = self - .callsite_ids - .pin() - .get(&span.metadata().callsite()) - .copied() - .unwrap_or_default(); + for span in self.spans.iter().rev() { + let ext = span.extensions(); - // Loki turns the # into an underscore during field name concatenation. - serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?; + // all spans should have this extension. + let Some(fields) = ext.get() else { continue }; - serializer.serialize_value(&SerializableSpanFields { - span: &span, - extract: &self.extract, - })?; - } + self.extracted.layer_span(fields); + + let SpanFields { values, span_info } = fields; + serializer.serialize_entry( + &*span_info.normalized_name, + &SerializableSpanFields { + fields: span.metadata().fields(), + values, + }, + )?; } serializer.end() @@ -950,80 +941,77 @@ where } /// Serializes the span fields as object. -struct SerializableSpanFields<'a, 'span, Span, const F: usize> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ - span: &'a SpanRef<'span, Span>, - extract: &'a ExtractedSpanFields<'a, F>, +struct SerializableSpanFields<'span> { + fields: &'span tracing::field::FieldSet, + values: &'span [serde_json::Value; MAX_TRACING_FIELDS], } -impl serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ +impl serde::ser::Serialize for SerializableSpanFields<'_> { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let ext = self.span.extensions(); - if let Some(data) = ext.get::() { - for (name, value) in &data.fields.pin() { - serializer.serialize_entry(name, value)?; - // TODO: replace clone with reference, if possible. - self.extract.set(name, value.clone()); + for (field, value) in std::iter::zip(self.fields, self.values) { + if value.is_null() { + continue; } + serializer.serialize_entry(field.name(), value)?; } serializer.end() } } -struct ExtractedSpanFields<'a, const F: usize> { - names: &'a IndexSet<&'static str>, - // TODO: replace TryLock with something local thread and interior mutability. - // serde API doesn't let us use `mut`. - values: TryLock<([Option; F], bool)>, +struct ExtractedSpanFields { + names: &'static [&'static str], + values: RefCell>, } -impl<'a, const F: usize> ExtractedSpanFields<'a, F> { - fn new(names: &'a IndexSet<&'static str>) -> Self { +impl ExtractedSpanFields { + fn new(names: &'static [&'static str]) -> Self { ExtractedSpanFields { names, - values: TryLock::new((array::from_fn(|_| Option::default()), false)), + values: RefCell::new(vec![serde_json::Value::Null; names.len()]), } } - #[inline] - fn set(&self, name: &'static str, value: serde_json::Value) { - if let Some((index, _)) = self.names.get_full(name) { - let mut fields = self.values.try_lock().expect("thread-local use"); - fields.0[index] = Some(value); - fields.1 = true; + fn layer_span(&self, fields: &SpanFields) { + let mut v = self.values.borrow_mut(); + let SpanFields { values, span_info } = fields; + + // extract the fields + for (i, &j) in span_info.extract.iter().enumerate() { + let Some(value) = values.get(j) else { continue }; + + if !value.is_null() { + // TODO: replace clone with reference, if possible. + v[i] = value.clone(); + } } } #[inline] fn has_values(&self) -> bool { - self.values.try_lock().expect("thread-local use").1 + self.values.borrow().iter().any(|v| !v.is_null()) } } -impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { +impl serde::ser::Serialize for ExtractedSpanFields { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let values = self.values.try_lock().expect("thread-local use"); - for (i, value) in values.0.iter().enumerate() { - if let Some(value) = value { - let key = self.names[i]; - serializer.serialize_entry(key, value)?; + let values = self.values.borrow(); + for (key, value) in std::iter::zip(self.names, &*values) { + if value.is_null() { + continue; } + + serializer.serialize_entry(key, value)?; } serializer.end() @@ -1032,7 +1020,6 @@ impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { #[cfg(test)] mod tests { - use std::marker::PhantomData; use std::sync::{Arc, Mutex, MutexGuard}; use assert_json_diff::assert_json_eq; @@ -1081,10 +1068,9 @@ mod tests { let log_layer = JsonLoggingLayer { clock: clock.clone(), skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + span_info: papaya::HashMap::default(), writer: buffer.clone(), - extract_fields: IndexSet::from_iter(["x"]), - _marker: PhantomData::<[&'static str; 1]>, + extract_fields: &["x"], }; let registry = tracing_subscriber::Registry::default().with(log_layer); diff --git a/proxy/src/scram/pbkdf2.rs b/proxy/src/scram/pbkdf2.rs index 9c559e9082..7f48e00c41 100644 --- a/proxy/src/scram/pbkdf2.rs +++ b/proxy/src/scram/pbkdf2.rs @@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 { // inspired from impl Pbkdf2 { pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self { - let hmac = + // key the HMAC and derive the first block in-place + let mut hmac = Hmac::::new_from_slice(str).expect("HMAC is able to accept all key sizes"); - - let prev = hmac - .clone() - .chain_update(salt) - .chain_update(1u32.to_be_bytes()) - .finalize() - .into_bytes(); + hmac.update(salt); + hmac.update(&1u32.to_be_bytes()); + let init_block = hmac.finalize_reset().into_bytes(); Self { hmac, - // one consumed for the hash above + // one iteration spent above iterations: iterations - 1, - hi: prev, - prev, + hi: init_block, + prev: init_block, } } @@ -44,14 +41,17 @@ impl Pbkdf2 { iterations, } = self; - // only do 4096 iterations per turn before sharing the thread for fairness + // only do up to 4096 iterations per turn for fairness let n = (*iterations).clamp(0, 4096); for _ in 0..n { - *prev = hmac.clone().chain_update(*prev).finalize().into_bytes(); + hmac.update(prev); + let block = hmac.finalize_reset().into_bytes(); - for (hi, prev) in hi.iter_mut().zip(*prev) { - *hi ^= prev; + for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) { + *hi_byte ^= b; } + + *prev = block; } *iterations -= n; diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 3616467c00..3eb6b7193c 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, }, "rel_size_v2_enabled": True, + "relsize_snapshot_cache_capacity": 10000, "gc_compaction_enabled": True, "gc_compaction_verification": False, "gc_compaction_initial_threshold_kb": 1024000, diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index b66b326360..6ee6837cd2 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [ {"name": "role$"}, {"name": "role$$"}, {"name": "role$x$"}, + {"name": "x"}, + {"name": "xx"}, + {"name": "$x"}, + {"name": "x$"}, + {"name": "$x$"}, + {"name": "xx$"}, + {"name": "$xx"}, + {"name": "$xx$"}, + # 63 bytes is the limit for role/DB names in Postgres + {"name": "x" * 63}, ] TEST_DB_NAMES = [ @@ -74,6 +84,43 @@ TEST_DB_NAMES = [ "name": "db name$x$", "owner": "role$x$", }, + { + "name": "x", + "owner": "x", + }, + { + "name": "xx", + "owner": "xx", + }, + { + "name": "$x", + "owner": "$x", + }, + { + "name": "x$", + "owner": "x$", + }, + { + "name": "$x$", + "owner": "$x$", + }, + { + "name": "xx$", + "owner": "xx$", + }, + { + "name": "$xx", + "owner": "$xx", + }, + { + "name": "$xx$", + "owner": "$xx$", + }, + # 63 bytes is the limit for role/DB names in Postgres + { + "name": "x" * 63, + "owner": "x" * 63, + }, ] @@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): """ Test that compute_ctl can create and work with databases and roles with special characters (whitespaces, %, tabs, etc.) in the name. + Also use `drop_subscriptions_before_start: true`. We do not actually + have any subscriptions in this test, so it should be no-op, but it + i) simulates the case when we create a second dev branch together with + a new project creation, and ii) just generally stresses more code paths. """ env = neon_simple_env @@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): **{ "spec": { "skip_pg_catalog_updates": False, + "drop_subscriptions_before_start": True, "cluster": { "roles": TEST_ROLE_NAMES, "databases": TEST_DB_NAMES, @@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): **{ "spec": { "skip_pg_catalog_updates": False, + "drop_subscriptions_before_start": True, "cluster": { "roles": [], "databases": [], diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index e2a22cc769..c88bc7aace 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -27,8 +27,9 @@ from contextlib import closing import psycopg2 import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup from fixtures.pg_version import PgVersion from fixtures.utils import query_scalar, skip_on_postgres, wait_until @@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): with secondary.cursor() as secondary_cur: secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (n_restarts,) + + +def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + sql = """ +CREATE TABLE CHAR_TBL(f1 char(4)); +CREATE TABLE FLOAT8_TBL(f1 float8); +CREATE TABLE INT2_TBL(f1 int2); +CREATE TABLE INT4_TBL(f1 int4); +CREATE TABLE INT8_TBL(q1 int8, q2 int8); +CREATE TABLE POINT_TBL(f1 point); +CREATE TABLE TEXT_TBL (f1 text); +CREATE TABLE VARCHAR_TBL(f1 varchar(4)); +CREATE TABLE onek (unique1 int4); +CREATE TABLE onek2 AS SELECT * FROM onek; +CREATE TABLE tenk1 (unique1 int4); +CREATE TABLE tenk2 AS SELECT * FROM tenk1; +CREATE TABLE person (name text, age int4,location point); +CREATE TABLE emp (salary int4, manager name) INHERITS (person); +CREATE TABLE student (gpa float8) INHERITS (person); +CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student); +CREATE TABLE road (name text,thepath path); +CREATE TABLE ihighway () INHERITS (road); +CREATE TABLE shighway(surface text) INHERITS (road); +CREATE TABLE BOOLTBL3 (d text, b bool, o int); +CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool); +DROP TABLE BOOLTBL3; +DROP TABLE BOOLTBL4; +CREATE TABLE ceil_floor_round (a numeric); +DROP TABLE ceil_floor_round; +CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8); +DROP TABLE width_bucket_test; +CREATE TABLE num_input_test (n1 numeric); +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +CREATE TABLE snapshot_test (nr integer, snap txid_snapshot); +CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field); +CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field); +TRUNCATE guid1; +DROP TABLE guid1; +DROP TABLE guid2 CASCADE; +CREATE TABLE numrange_test (nr NUMRANGE); +CREATE INDEX numrange_test_btree on numrange_test(nr); +CREATE TABLE numrange_test2(nr numrange); +CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr); +INSERT INTO numrange_test2 VALUES('[, 5)'); +CREATE TABLE textrange_test (tr text); +CREATE INDEX textrange_test_btree on textrange_test(tr); +CREATE TABLE test_range_gist(ir int4range); +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +DROP INDEX test_range_gist_idx; +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +CREATE TABLE test_range_spgist(ir int4range); +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +DROP INDEX test_range_spgist_idx; +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +CREATE TABLE test_range_elem(i int4); +CREATE INDEX test_range_elem_idx on test_range_elem (i); +CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10)); +DROP TABLE test_range_elem; +CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&)); +CREATE TABLE f_test(f text, i int); +CREATE TABLE i8r_array (f1 int, f2 text); +CREATE TYPE arrayrange as range (subtype=int4[]); +CREATE TYPE two_ints as (a int, b int); +DROP TYPE two_ints cascade; +CREATE TABLE text_support_test (t text); +CREATE TABLE TEMP_FLOAT (f1 FLOAT8); +CREATE TABLE TEMP_INT4 (f1 INT4); +CREATE TABLE TEMP_INT2 (f1 INT2); +CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8); +CREATE TABLE POLYGON_TBL(f1 polygon); +CREATE TABLE quad_poly_tbl (id int, p polygon); +INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y; +CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl; +CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl; +""" + + with endpoint.cursor() as cur: + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + for line in sql.split("\n"): + if len(line.strip()) == 0 or line.startswith("--"): + continue + cur.execute(line) + + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + cur.execute("VACUUM FULL pg_class;") + + for ep in env.endpoints.endpoints: + log.info(f"{ep.endpoint_id} / {ep.pg_port}") + pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"] + env_vars = { + "PGPORT": str(ep.pg_port), + "PGUSER": endpoint.default_options["user"], + "PGHOST": endpoint.default_options["host"], + } + pg_bin.run_capture(pg_dump_command, env=env_vars)