From e94acbc816cf9eb453938353f301a648c6ce036c Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Tue, 20 May 2025 11:03:36 +0200 Subject: [PATCH 1/4] fix(compute_ctl): Dollar escaping and tests (#11969) ## Problem In the escaping path we were checking that `${tag}$` or `${outer_tag}$` are present in the string, but that's not enough, as original string surrounded by `$` can also form a 'tag', like `$x$xx$x$`, which is fine on it's own, but cannot be used in the string escaped with `$xx$`. ## Summary of changes Remove `$` from the checks, just check if `{tag}` or `{outer_tag}` are present. Add more test cases and change the catalog test to stress the `drop_subscriptions_before_start: true` path as well. Fixes https://github.com/neondatabase/cloud/issues/29198 --- compute_tools/src/pg_helpers.rs | 6 ++- compute_tools/tests/pg_helpers_tests.rs | 8 ++++ test_runner/regress/test_compute_catalog.py | 53 +++++++++++++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 10d8f2c878..94467a0d2f 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -213,8 +213,10 @@ impl Escaping for PgIdent { // Find the first suitable tag that is not present in the string. // Postgres' max role/DB name length is 63 bytes, so even in the - // worst case it won't take long. - while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) { + // worst case it won't take long. Outer tag is always `tag + "x"`, + // so if `tag` is not present in the string, `outer_tag` is not + // present in the string either. + while self.contains(&tag.to_string()) { tag += "x"; outer_tag = tag.clone() + "x"; } diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index 53f2ddad84..04b6ed2256 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor ("name$$$", ("$x$name$$$$x$", "xx")), ("name$$$$", ("$x$name$$$$$x$", "xx")), ("name$x$", ("$xx$name$x$$xx$", "xxx")), + ("x", ("$xx$x$xx$", "xxx")), + ("xx", ("$xxx$xx$xxx$", "xxxx")), + ("$x", ("$xx$$x$xx$", "xxx")), + ("x$", ("$xx$x$$xx$", "xxx")), + ("$x$", ("$xx$$x$$xx$", "xxx")), + ("xx$", ("$xxx$xx$$xxx$", "xxxx")), + ("$xx", ("$xxx$$xx$xxx$", "xxxx")), + ("$xx$", ("$xxx$$xx$$xxx$", "xxxx")), ]; for (input, expected) in test_cases { diff --git a/test_runner/regress/test_compute_catalog.py b/test_runner/regress/test_compute_catalog.py index b66b326360..6ee6837cd2 100644 --- a/test_runner/regress/test_compute_catalog.py +++ b/test_runner/regress/test_compute_catalog.py @@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [ {"name": "role$"}, {"name": "role$$"}, {"name": "role$x$"}, + {"name": "x"}, + {"name": "xx"}, + {"name": "$x"}, + {"name": "x$"}, + {"name": "$x$"}, + {"name": "xx$"}, + {"name": "$xx"}, + {"name": "$xx$"}, + # 63 bytes is the limit for role/DB names in Postgres + {"name": "x" * 63}, ] TEST_DB_NAMES = [ @@ -74,6 +84,43 @@ TEST_DB_NAMES = [ "name": "db name$x$", "owner": "role$x$", }, + { + "name": "x", + "owner": "x", + }, + { + "name": "xx", + "owner": "xx", + }, + { + "name": "$x", + "owner": "$x", + }, + { + "name": "x$", + "owner": "x$", + }, + { + "name": "$x$", + "owner": "$x$", + }, + { + "name": "xx$", + "owner": "xx$", + }, + { + "name": "$xx", + "owner": "$xx", + }, + { + "name": "$xx$", + "owner": "$xx$", + }, + # 63 bytes is the limit for role/DB names in Postgres + { + "name": "x" * 63, + "owner": "x" * 63, + }, ] @@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): """ Test that compute_ctl can create and work with databases and roles with special characters (whitespaces, %, tabs, etc.) in the name. + Also use `drop_subscriptions_before_start: true`. We do not actually + have any subscriptions in this test, so it should be no-op, but it + i) simulates the case when we create a second dev branch together with + a new project creation, and ii) just generally stresses more code paths. """ env = neon_simple_env @@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): **{ "spec": { "skip_pg_catalog_updates": False, + "drop_subscriptions_before_start": True, "cluster": { "roles": TEST_ROLE_NAMES, "databases": TEST_DB_NAMES, @@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv): **{ "spec": { "skip_pg_catalog_updates": False, + "drop_subscriptions_before_start": True, "cluster": { "roles": [], "databases": [], From 568779fa8a601b8f790a477dbe1a5b3caa9d6dad Mon Sep 17 00:00:00 2001 From: Konstantin Merenkov Date: Tue, 20 May 2025 17:23:54 +0200 Subject: [PATCH 2/4] proxy/scram: avoid memory copy to improve performance (#11980) Touches #11941 ## Problem Performance of our PBKDF2 was worse than reference. ## Summary of changes Avoided memory copy when HMACing in a tight loop. --- proxy/src/scram/pbkdf2.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/proxy/src/scram/pbkdf2.rs b/proxy/src/scram/pbkdf2.rs index 9c559e9082..7f48e00c41 100644 --- a/proxy/src/scram/pbkdf2.rs +++ b/proxy/src/scram/pbkdf2.rs @@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 { // inspired from impl Pbkdf2 { pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self { - let hmac = + // key the HMAC and derive the first block in-place + let mut hmac = Hmac::::new_from_slice(str).expect("HMAC is able to accept all key sizes"); - - let prev = hmac - .clone() - .chain_update(salt) - .chain_update(1u32.to_be_bytes()) - .finalize() - .into_bytes(); + hmac.update(salt); + hmac.update(&1u32.to_be_bytes()); + let init_block = hmac.finalize_reset().into_bytes(); Self { hmac, - // one consumed for the hash above + // one iteration spent above iterations: iterations - 1, - hi: prev, - prev, + hi: init_block, + prev: init_block, } } @@ -44,14 +41,17 @@ impl Pbkdf2 { iterations, } = self; - // only do 4096 iterations per turn before sharing the thread for fairness + // only do up to 4096 iterations per turn for fairness let n = (*iterations).clamp(0, 4096); for _ in 0..n { - *prev = hmac.clone().chain_update(*prev).finalize().into_bytes(); + hmac.update(prev); + let block = hmac.finalize_reset().into_bytes(); - for (hi, prev) in hi.iter_mut().zip(*prev) { - *hi ^= prev; + for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) { + *hi_byte ^= b; } + + *prev = block; } *iterations -= n; From 2e3dc9a8c203ad3a73fc97683a11a928b187bf7f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 20 May 2025 18:38:27 +0300 Subject: [PATCH 3/4] Add rel_size_replica_cache (#11889) ## Problem See Discussion: https://neondb.slack.com/archives/C033RQ5SPDH/p1746645666075799 Issue: https://github.com/neondatabase/cloud/issues/28609 Relation size cache is not correctly updated at PS in case of replicas. ## Summary of changes 1. Have two caches for relation size in timeline: `rel_size_primary_cache` and `rel_size_replica_cache`. 2. `rel_size_primary_cache` is actually what we have now. The only difference is that it is not updated in `get_rel_size`, only by WAL ingestion 3. `rel_size_replica_cache` has limited size (LruCache) and it's key is `(Lsn,RelTag)` . It is updated in `get_rel_size`. Only strict LSN matches are accepted as cache hit. --------- Co-authored-by: Konstantin Knizhnik --- Cargo.lock | 1 + control_plane/src/pageserver.rs | 5 + libs/pageserver_api/src/config.rs | 4 + libs/pageserver_api/src/models.rs | 13 ++ pageserver/Cargo.toml | 1 + pageserver/src/basebackup.rs | 10 +- pageserver/src/metrics.rs | 45 ++++- pageserver/src/page_service.rs | 59 ++++-- pageserver/src/pgdatadir_mapping.rs | 169 ++++++++++++------ pageserver/src/tenant/timeline.rs | 35 ++-- pageserver/src/walingest.rs | 90 +++++----- .../regress/test_attach_tenant_config.py | 1 + test_runner/regress/test_replica_start.py | 110 +++++++++++- 13 files changed, 395 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d919537818..9f4d537b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4286,6 +4286,7 @@ dependencies = [ "enumset", "fail", "futures", + "hashlink", "hex", "hex-literal", "http-utils", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 79e87eba9b..587f3774d4 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -546,6 +546,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("Falied to parse 'sampling_ratio'")?, + relsize_snapshot_cache_capacity: settings + .remove("relsize snapshot cache capacity") + .map(|x| x.parse::()) + .transpose() + .context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 2618366469..73b6eee554 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -491,6 +491,8 @@ pub struct TenantConfigToml { /// Tenant level performance sampling ratio override. Controls the ratio of get page requests /// that will get perf sampling for the tenant. pub sampling_ratio: Option, + /// Capacity of relsize snapshot cache (used by replicas). + pub relsize_snapshot_cache_capacity: usize, } pub mod defaults { @@ -730,6 +732,7 @@ pub mod tenant_conf_defaults { pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true; pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100; + pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000; } impl Default for TenantConfigToml { @@ -787,6 +790,7 @@ impl Default for TenantConfigToml { gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB, gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, sampling_ratio: None, + relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e9b37c8ca6..ca26286b86 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -630,6 +630,8 @@ pub struct TenantConfigPatch { pub gc_compaction_ratio_percent: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub sampling_ratio: FieldPatch>, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub relsize_snapshot_cache_capacity: FieldPatch, } /// Like [`crate::config::TenantConfigToml`], but preserves the information @@ -759,6 +761,9 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub sampling_ratio: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub relsize_snapshot_cache_capacity: Option, } impl TenantConfig { @@ -804,6 +809,7 @@ impl TenantConfig { mut gc_compaction_initial_threshold_kb, mut gc_compaction_ratio_percent, mut sampling_ratio, + mut relsize_snapshot_cache_capacity, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -905,6 +911,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .apply(&mut gc_compaction_ratio_percent); patch.sampling_ratio.apply(&mut sampling_ratio); + patch + .relsize_snapshot_cache_capacity + .apply(&mut relsize_snapshot_cache_capacity); Ok(Self { checkpoint_distance, @@ -944,6 +953,7 @@ impl TenantConfig { gc_compaction_initial_threshold_kb, gc_compaction_ratio_percent, sampling_ratio, + relsize_snapshot_cache_capacity, }) } @@ -1052,6 +1062,9 @@ impl TenantConfig { .gc_compaction_ratio_percent .unwrap_or(global_conf.gc_compaction_ratio_percent), sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio), + relsize_snapshot_cache_capacity: self + .relsize_snapshot_cache_capacity + .unwrap_or(global_conf.relsize_snapshot_cache_capacity), } } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index b7b3e0eaf1..6a9a5a292a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -30,6 +30,7 @@ crc32c.workspace = true either.workspace = true fail.workspace = true futures.workspace = true +hashlink.workspace = true hex.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index b49021461e..e89baa0bce 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -343,7 +343,7 @@ where // Gather non-relational files from object storage pages. let slru_partitions = self .timeline - .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) + .get_slru_keyspace(Version::at(self.lsn), self.ctx) .await? .partition( self.timeline.get_shard_identity(), @@ -378,7 +378,7 @@ where // Otherwise only include init forks of unlogged relations. let rels = self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; for &rel in rels.iter() { // Send init fork as main fork to provide well formed empty @@ -517,7 +517,7 @@ where async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> { let nblocks = self .timeline - .get_rel_size(src, Version::Lsn(self.lsn), self.ctx) + .get_rel_size(src, Version::at(self.lsn), self.ctx) .await?; // If the relation is empty, create an empty file @@ -577,7 +577,7 @@ where let relmap_img = if has_relmap_file { let img = self .timeline - .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await?; if img.len() @@ -631,7 +631,7 @@ where if !has_relmap_file && self .timeline - .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) + .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx) .await? .is_empty() { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 8e4dbd6c3e..c50f730f41 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy = Lazy::new(| .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy = Lazy::new(|| { register_uint_gauge!( - "pageserver_relsize_cache_entries", - "Number of entries in the relation size cache", + "pageserver_relsize_latest_cache_entries", + "Number of entries in the latest relation size cache", ) .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_HITS: Lazy = Lazy::new(|| { - register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",) - .expect("failed to define a metric") +pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_latest_cache_hits", + "Latest relation size cache hits", + ) + .expect("failed to define a metric") }); -pub(crate) static RELSIZE_CACHE_MISSES: Lazy = Lazy::new(|| { +pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_relsize_cache_misses", - "Relation size cache misses", + "pageserver_relsize_latest_cache_misses", + "Relation size latest cache misses", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_relsize_snapshot_cache_entries", + "Number of entries in the pitr relation size cache", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_hits", + "Pitr relation size cache hits", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_snapshot_cache_misses", + "Relation size snapshot cache misses", ) .expect("failed to define a metric") }); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 83d9191240..e46ba8d3a1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -62,7 +62,7 @@ use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, SmgrOpTimer, TimelineMetrics, }; -use crate::pgdatadir_mapping::Version; +use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, @@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError { struct BatchedGetPageRequest { req: PagestreamGetPageRequest, timer: SmgrOpTimer, - effective_request_lsn: Lsn, + lsn_range: LsnRange, ctx: RequestContext, } @@ -764,12 +764,12 @@ impl BatchedFeMessage { match batching_strategy { PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => { if let Some(last_in_batch) = accum_pages.last() { - if last_in_batch.effective_request_lsn - != this_pages[0].effective_request_lsn + if last_in_batch.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn { trace!( - accum_lsn = %last_in_batch.effective_request_lsn, - this_lsn = %this_pages[0].effective_request_lsn, + accum_lsn = %last_in_batch.lsn_range.effective_lsn, + this_lsn = %this_pages[0].lsn_range.effective_lsn, "stopping batching because LSN changed" ); @@ -784,15 +784,15 @@ impl BatchedFeMessage { let same_page_different_lsn = accum_pages.iter().any(|batched| { batched.req.rel == this_pages[0].req.rel && batched.req.blkno == this_pages[0].req.blkno - && batched.effective_request_lsn - != this_pages[0].effective_request_lsn + && batched.lsn_range.effective_lsn + != this_pages[0].lsn_range.effective_lsn }); if same_page_different_lsn { trace!( rel=%this_pages[0].req.rel, blkno=%this_pages[0].req.blkno, - lsn=%this_pages[0].effective_request_lsn, + lsn=%this_pages[0].lsn_range.effective_lsn, "stopping batching because same page was requested at different LSNs" ); @@ -1158,7 +1158,7 @@ impl PageServerHandler { .await?; // We're holding the Handle - let effective_request_lsn = match Self::effective_request_lsn( + let effective_lsn = match Self::effective_request_lsn( &shard, shard.get_last_record_lsn(), req.hdr.request_lsn, @@ -1177,7 +1177,10 @@ impl PageServerHandler { pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, - effective_request_lsn, + lsn_range: LsnRange { + effective_lsn, + request_lsn: req.hdr.request_lsn + }, ctx, }], // The executor grabs the batch when it becomes idle. @@ -2127,7 +2130,14 @@ impl PageServerHandler { .await?; let exists = timeline - .get_rel_exists(req.rel, Version::Lsn(lsn), ctx) + .get_rel_exists( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -2154,7 +2164,14 @@ impl PageServerHandler { .await?; let n_blocks = timeline - .get_rel_size(req.rel, Version::Lsn(lsn), ctx) + .get_rel_size( + req.rel, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { @@ -2181,7 +2198,15 @@ impl PageServerHandler { .await?; let total_blocks = timeline - .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx) + .get_db_size( + DEFAULTTABLESPACE_OID, + req.dbnode, + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: req.hdr.request_lsn, + }), + ctx, + ) .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; @@ -2214,7 +2239,7 @@ impl PageServerHandler { // Ignore error (trace buffer may be full or tracer may have disconnected). _ = page_trace.try_send(PageTraceEvent { key, - effective_lsn: batch.effective_request_lsn, + effective_lsn: batch.lsn_range.effective_lsn, time, }); } @@ -2229,7 +2254,7 @@ impl PageServerHandler { perf_instrument = true; } - req.effective_request_lsn + req.lsn_range.effective_lsn }) .max() .expect("batch is never empty"); @@ -2283,7 +2308,7 @@ impl PageServerHandler { ( &p.req.rel, &p.req.blkno, - p.effective_request_lsn, + p.lsn_range, p.ctx.attached_child(), ) }), diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0f9bfd19a7..c6f3929257 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,7 +43,9 @@ use crate::aux_file; use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder}; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::{ - RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD, + RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS, + RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS, + RELSIZE_SNAPSHOT_CACHE_MISSES, }; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, @@ -90,6 +92,28 @@ pub enum LsnForTimestamp { NoData(Lsn), } +/// Each request to page server contains LSN range: `not_modified_since..request_lsn`. +/// See comments libs/pageserver_api/src/models.rs. +/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`. +/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`. +#[derive(Debug, Clone, Copy, Default)] +pub struct LsnRange { + pub effective_lsn: Lsn, + pub request_lsn: Lsn, +} + +impl LsnRange { + pub fn at(lsn: Lsn) -> LsnRange { + LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + } + } + pub fn is_latest(&self) -> bool { + self.request_lsn == Lsn::MAX + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum CalculateLogicalSizeError { #[error("cancelled")] @@ -202,13 +226,13 @@ impl Timeline { io_concurrency: IoConcurrency, ) -> Result { match version { - Version::Lsn(effective_lsn) => { + Version::LsnRange(lsns) => { let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self .get_rel_page_at_lsn_batched( - pages.iter().map(|(tag, blknum)| { - (tag, blknum, effective_lsn, ctx.attached_child()) - }), + pages + .iter() + .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())), io_concurrency.clone(), ctx, ) @@ -246,7 +270,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: impl ExactSizeIterator, + pages: impl ExactSizeIterator, io_concurrency: IoConcurrency, ctx: &RequestContext, ) -> Vec> { @@ -265,7 +289,7 @@ impl Timeline { let mut req_keyspaces: HashMap = HashMap::with_capacity(pages.len()); - for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() { + for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -274,7 +298,7 @@ impl Timeline { slots_filled += 1; continue; } - + let lsn = lsns.effective_lsn; let nblocks = { let ctx = RequestContextBuilder::from(&ctx) .perf_span(|crnt_perf_span| { @@ -289,7 +313,7 @@ impl Timeline { .attached_child(); match self - .get_rel_size(*tag, Version::Lsn(lsn), &ctx) + .get_rel_size(*tag, Version::LsnRange(lsns), &ctx) .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone()) .await { @@ -470,7 +494,7 @@ impl Timeline { )); } - if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(nblocks); } @@ -488,7 +512,7 @@ impl Timeline { let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); - self.update_cached_rel_size(tag, version.get_lsn(), nblocks); + self.update_cached_rel_size(tag, version, nblocks); Ok(nblocks) } @@ -510,7 +534,7 @@ impl Timeline { } // first try to lookup relation in cache - if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { + if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) { return Ok(true); } // then check if the database was already initialized. @@ -632,7 +656,7 @@ impl Timeline { ) -> Result { assert!(self.tenant_shard_id.is_shard_zero()); let n_blocks = self - .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) + .get_slru_segment_size(kind, segno, Version::at(lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -867,11 +891,11 @@ impl Timeline { mut f: impl FnMut(TimestampTz) -> ControlFlow, ) -> Result { for segno in self - .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx) + .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx) .await? { let nblocks = self - .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx) + .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx) .await?; let keyspace = KeySpace::single( @@ -1137,7 +1161,7 @@ impl Timeline { let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self - .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx) + .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx) .await? { if self.cancel.is_cancelled() { @@ -1212,7 +1236,7 @@ impl Timeline { result.add_key(rel_dir_to_key(spcnode, dbnode)); let mut rels: Vec = self - .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx) + .list_rels(spcnode, dbnode, Version::at(lsn), ctx) .await? .into_iter() .collect(); @@ -1329,59 +1353,75 @@ impl Timeline { Ok((dense_keyspace, sparse_keyspace)) } - /// Get cached size of relation if it not updated after specified LSN - pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { - let rel_size_cache = self.rel_size_cache.read().unwrap(); - if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) { - if lsn >= *cached_lsn { - RELSIZE_CACHE_HITS.inc(); - return Some(*nblocks); + /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of + /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size + /// at the particular LSN (snapshot). + pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option { + let lsn = version.get_lsn(); + { + let rel_size_cache = self.rel_size_latest_cache.read().unwrap(); + if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { + if lsn >= *cached_lsn { + RELSIZE_LATEST_CACHE_HITS.inc(); + return Some(*nblocks); + } + RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES_OLD.inc(); } - RELSIZE_CACHE_MISSES.inc(); + { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) { + RELSIZE_SNAPSHOT_CACHE_HITS.inc(); + return Some(*nblock); + } + } + if version.is_latest() { + RELSIZE_LATEST_CACHE_MISSES.inc(); + } else { + RELSIZE_SNAPSHOT_CACHE_MISSES.inc(); + } None } /// Update cached relation size if there is no more recent update - pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - - if lsn < rel_size_cache.complete_as_of { - // Do not cache old values. It's safe to cache the size on read, as long as - // the read was at an LSN since we started the WAL ingestion. Reasoning: we - // never evict values from the cache, so if the relation size changed after - // 'lsn', the new value is already in the cache. - return; - } - - match rel_size_cache.map.entry(tag) { - hash_map::Entry::Occupied(mut entry) => { - let cached_lsn = entry.get_mut(); - if lsn >= cached_lsn.0 { - *cached_lsn = (lsn, nblocks); + pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) { + let lsn = version.get_lsn(); + if version.is_latest() { + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + match rel_size_cache.entry(tag) { + hash_map::Entry::Occupied(mut entry) => { + let cached_lsn = entry.get_mut(); + if lsn >= cached_lsn.0 { + *cached_lsn = (lsn, nblocks); + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((lsn, nblocks)); + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } - hash_map::Entry::Vacant(entry) => { - entry.insert((lsn, nblocks)); - RELSIZE_CACHE_ENTRIES.inc(); + } else { + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if rel_size_cache.capacity() != 0 { + rel_size_cache.insert((lsn, tag), nblocks); + RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64); } } } /// Store cached relation size pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() { - RELSIZE_CACHE_ENTRIES.inc(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() { + RELSIZE_LATEST_CACHE_ENTRIES.inc(); } } /// Remove cached relation size pub fn remove_cached_rel_size(&self, tag: &RelTag) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - if rel_size_cache.map.remove(tag).is_some() { - RELSIZE_CACHE_ENTRIES.dec(); + let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap(); + if rel_size_cache.remove(tag).is_some() { + RELSIZE_LATEST_CACHE_ENTRIES.dec(); } } } @@ -1585,7 +1625,10 @@ impl DatadirModification<'_> { // check the cache too. This is because eagerly checking the cache results in // less work overall and 10% better performance. It's more work on cache miss // but cache miss is rare. - if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) { + if let Some(nblocks) = self + .tline + .get_cached_rel_size(&rel, Version::Modified(self)) + { Ok(nblocks) } else if !self .tline @@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats { /// timeline to not miss the latest updates. #[derive(Clone, Copy)] pub enum Version<'a> { - Lsn(Lsn), + LsnRange(LsnRange), Modified(&'a DatadirModification<'a>), } @@ -2679,7 +2722,7 @@ impl Version<'_> { ctx: &RequestContext, ) -> Result { match self { - Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, + Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await, Version::Modified(modification) => modification.get(key, ctx).await, } } @@ -2701,12 +2744,26 @@ impl Version<'_> { } } - fn get_lsn(&self) -> Lsn { + pub fn is_latest(&self) -> bool { match self { - Version::Lsn(lsn) => *lsn, + Version::LsnRange(lsns) => lsns.is_latest(), + Version::Modified(_) => true, + } + } + + pub fn get_lsn(&self) -> Lsn { + match self { + Version::LsnRange(lsns) => lsns.effective_lsn, Version::Modified(modification) => modification.lsn, } } + + pub fn at(lsn: Lsn) -> Self { + Version::LsnRange(LsnRange { + effective_lsn: lsn, + request_lsn: lsn, + }) + } } //--- Metadata structs stored in key-value pairs in the repository. diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c92ab47a..da2e56d80a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ pub mod span; pub mod uninit; mod walreceiver; +use hashlink::LruCache; use std::array; use std::cmp::{max, min}; use std::collections::btree_map::Entry; @@ -197,16 +198,6 @@ pub struct TimelineResources { pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } -/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL -/// ingestion considerably, because WAL ingestion needs to check on most records if the record -/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end -/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the -/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`]. -pub(crate) struct RelSizeCache { - pub(crate) complete_as_of: Lsn, - pub(crate) map: HashMap, -} - pub struct Timeline { pub(crate) conf: &'static PageServerConf, tenant_conf: Arc>, @@ -365,7 +356,8 @@ pub struct Timeline { pub walreceiver: Mutex>, /// Relation size cache - pub(crate) rel_size_cache: RwLock, + pub(crate) rel_size_latest_cache: RwLock>, + pub(crate) rel_size_snapshot_cache: Mutex>, download_all_remote_layers_task_info: RwLock>, @@ -2820,6 +2812,13 @@ impl Timeline { self.remote_client.update_config(&new_conf.location); + let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap(); + if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity { + if new_capacity != rel_size_cache.capacity() { + rel_size_cache.set_capacity(new_capacity); + } + } + self.metrics .evictions_with_low_residence_duration .write() @@ -2878,6 +2877,14 @@ impl Timeline { ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded); } + let relsize_snapshot_cache_capacity = { + let loaded_tenant_conf = tenant_conf.load(); + loaded_tenant_conf + .tenant_conf + .relsize_snapshot_cache_capacity + .unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity) + }; + Arc::new_cyclic(|myself| { let metrics = Arc::new(TimelineMetrics::new( &tenant_shard_id, @@ -2969,10 +2976,8 @@ impl Timeline { last_image_layer_creation_check_instant: Mutex::new(None), last_received_wal: Mutex::new(None), - rel_size_cache: RwLock::new(RelSizeCache { - complete_as_of: disk_consistent_lsn, - map: HashMap::new(), - }), + rel_size_latest_cache: RwLock::new(HashMap::new()), + rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), download_all_remote_layers_task_info: RwLock::new(None), diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e60c590f87..c7a6655052 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1684,31 +1684,31 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1719,7 +1719,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x20)), + Version::at(Lsn(0x20)), &ctx, io_concurrency.clone() ) @@ -1733,7 +1733,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x30)), + Version::at(Lsn(0x30)), &ctx, io_concurrency.clone() ) @@ -1747,7 +1747,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1760,7 +1760,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x40)), + Version::at(Lsn(0x40)), &ctx, io_concurrency.clone() ) @@ -1774,7 +1774,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1787,7 +1787,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1800,7 +1800,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1820,7 +1820,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 2 ); @@ -1829,7 +1829,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1842,7 +1842,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -1854,7 +1854,7 @@ mod tests { // should still see the truncated block with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, 3 ); @@ -1863,7 +1863,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 2, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -1880,7 +1880,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx) .await?, 0 ); @@ -1893,7 +1893,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx) .await?, 2 ); @@ -1902,7 +1902,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 0, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1915,7 +1915,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1, - Version::Lsn(Lsn(0x70)), + Version::at(Lsn(0x70)), &ctx, io_concurrency.clone() ) @@ -1932,7 +1932,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, 1501 ); @@ -1942,7 +1942,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blk, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1956,7 +1956,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, 1500, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -1990,13 +1990,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, 1 ); @@ -2011,7 +2011,7 @@ mod tests { // Check that rel is not visible anymore assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx) .await?, false ); @@ -2029,13 +2029,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx) .await?, 1 ); @@ -2077,26 +2077,26 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await?, false ); assert!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx) .await .is_err() ); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx) .await?, relsize ); @@ -2110,7 +2110,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(lsn), + Version::at(lsn), &ctx, io_concurrency.clone() ) @@ -2131,7 +2131,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx) .await?, 1 ); @@ -2144,7 +2144,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x60)), + Version::at(Lsn(0x60)), &ctx, io_concurrency.clone() ) @@ -2157,7 +2157,7 @@ mod tests { // should still see all blocks with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx) .await?, relsize ); @@ -2169,7 +2169,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x50)), + Version::at(Lsn(0x50)), &ctx, io_concurrency.clone() ) @@ -2193,13 +2193,13 @@ mod tests { assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx) .await?, relsize ); @@ -2212,7 +2212,7 @@ mod tests { .get_rel_page_at_lsn( TESTREL_A, blkno, - Version::Lsn(Lsn(0x80)), + Version::at(Lsn(0x80)), &ctx, io_concurrency.clone() ) @@ -2250,7 +2250,7 @@ mod tests { assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE + 1 ); @@ -2264,7 +2264,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE ); @@ -2279,7 +2279,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, RELSEG_SIZE - 1 ); @@ -2297,7 +2297,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx) + .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx) .await?, size as BlockNumber ); diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 3616467c00..3eb6b7193c 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, }, "rel_size_v2_enabled": True, + "relsize_snapshot_cache_capacity": 10000, "gc_compaction_enabled": True, "gc_compaction_verification": False, "gc_compaction_initial_threshold_kb": 1024000, diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index e2a22cc769..c88bc7aace 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -27,8 +27,9 @@ from contextlib import closing import psycopg2 import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup from fixtures.pg_version import PgVersion from fixtures.utils import query_scalar, skip_on_postgres, wait_until @@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): with secondary.cursor() as secondary_cur: secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (n_restarts,) + + +def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + sql = """ +CREATE TABLE CHAR_TBL(f1 char(4)); +CREATE TABLE FLOAT8_TBL(f1 float8); +CREATE TABLE INT2_TBL(f1 int2); +CREATE TABLE INT4_TBL(f1 int4); +CREATE TABLE INT8_TBL(q1 int8, q2 int8); +CREATE TABLE POINT_TBL(f1 point); +CREATE TABLE TEXT_TBL (f1 text); +CREATE TABLE VARCHAR_TBL(f1 varchar(4)); +CREATE TABLE onek (unique1 int4); +CREATE TABLE onek2 AS SELECT * FROM onek; +CREATE TABLE tenk1 (unique1 int4); +CREATE TABLE tenk2 AS SELECT * FROM tenk1; +CREATE TABLE person (name text, age int4,location point); +CREATE TABLE emp (salary int4, manager name) INHERITS (person); +CREATE TABLE student (gpa float8) INHERITS (person); +CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student); +CREATE TABLE road (name text,thepath path); +CREATE TABLE ihighway () INHERITS (road); +CREATE TABLE shighway(surface text) INHERITS (road); +CREATE TABLE BOOLTBL3 (d text, b bool, o int); +CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool); +DROP TABLE BOOLTBL3; +DROP TABLE BOOLTBL4; +CREATE TABLE ceil_floor_round (a numeric); +DROP TABLE ceil_floor_round; +CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8); +DROP TABLE width_bucket_test; +CREATE TABLE num_input_test (n1 numeric); +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +CREATE TABLE snapshot_test (nr integer, snap txid_snapshot); +CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now())); +CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field); +CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field); +TRUNCATE guid1; +DROP TABLE guid1; +DROP TABLE guid2 CASCADE; +CREATE TABLE numrange_test (nr NUMRANGE); +CREATE INDEX numrange_test_btree on numrange_test(nr); +CREATE TABLE numrange_test2(nr numrange); +CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr); +INSERT INTO numrange_test2 VALUES('[, 5)'); +CREATE TABLE textrange_test (tr text); +CREATE INDEX textrange_test_btree on textrange_test(tr); +CREATE TABLE test_range_gist(ir int4range); +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +DROP INDEX test_range_gist_idx; +CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir); +CREATE TABLE test_range_spgist(ir int4range); +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +DROP INDEX test_range_spgist_idx; +CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir); +CREATE TABLE test_range_elem(i int4); +CREATE INDEX test_range_elem_idx on test_range_elem (i); +CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10)); +DROP TABLE test_range_elem; +CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&)); +CREATE TABLE f_test(f text, i int); +CREATE TABLE i8r_array (f1 int, f2 text); +CREATE TYPE arrayrange as range (subtype=int4[]); +CREATE TYPE two_ints as (a int, b int); +DROP TYPE two_ints cascade; +CREATE TABLE text_support_test (t text); +CREATE TABLE TEMP_FLOAT (f1 FLOAT8); +CREATE TABLE TEMP_INT4 (f1 INT4); +CREATE TABLE TEMP_INT2 (f1 INT2); +CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8); +CREATE TABLE POLYGON_TBL(f1 polygon); +CREATE TABLE quad_poly_tbl (id int, p polygon); +INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y; +CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl; +CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl; +""" + + with endpoint.cursor() as cur: + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + for line in sql.split("\n"): + if len(line.strip()) == 0 or line.startswith("--"): + continue + cur.execute(line) + + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + env.endpoints.create_start(branch_name="main", lsn=lsn) + log.info(f"lsn: {lsn}") + + cur.execute("VACUUM FULL pg_class;") + + for ep in env.endpoints.endpoints: + log.info(f"{ep.endpoint_id} / {ep.pg_port}") + pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"] + env_vars = { + "PGPORT": str(ep.pg_port), + "PGUSER": endpoint.default_options["user"], + "PGHOST": endpoint.default_options["host"], + } + pg_bin.run_capture(pg_dump_command, env=env_vars) From f3c9d0adf437f4d8ce2de3a933cda0fba7bb3cc9 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 20 May 2025 19:57:59 +0200 Subject: [PATCH 4/4] proxy(logging): significant changes to json logging internals for performance. (#11974) #11962 Please review each commit separately. Each commit is rather small in goal. The overall goal of this PR is to keep the behaviour identical, but shave away small inefficiencies here and there. --- proxy/src/logging.rs | 484 +++++++++++++++++++++---------------------- 1 file changed, 235 insertions(+), 249 deletions(-) diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index efa3c0b514..a58b55a704 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,13 +1,11 @@ -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; -use std::hash::BuildHasher; +use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; -use std::{array, env, fmt, io}; +use std::{env, io}; use chrono::{DateTime, Utc}; -use indexmap::IndexSet; use opentelemetry::trace::TraceContextExt; -use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; @@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::{LookupSpan, SpanRef}; -use try_lock::TryLock; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result { StderrWriter { stderr: std::io::stderr(), }, - ["request_id", "session_id", "conn_id"], + &["request_id", "session_id", "conn_id"], )) } else { None @@ -183,50 +180,65 @@ impl Clock for RealClock { /// Name of the field used by tracing crate to store the event message. const MESSAGE_FIELD: &str = "message"; +/// Tracing used to enforce that spans/events have no more than 32 fields. +/// It seems this is no longer the case, but it's still documented in some places. +/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and +/// rely on it for some (minor) performance gains. +const MAX_TRACING_FIELDS: usize = 32; + thread_local! { - /// Protects against deadlocks and double panics during log writing. - /// The current panic handler will use tracing to log panic information. - static REENTRANCY_GUARD: Cell = const { Cell::new(false) }; /// Thread-local instance with per-thread buffer for log writing. - static EVENT_FORMATTER: RefCell = RefCell::new(EventFormatter::new()); + static EVENT_FORMATTER: RefCell = const { RefCell::new(EventFormatter::new()) }; /// Cached OS thread ID. static THREAD_ID: u64 = gettid::gettid(); } +/// Map for values fixed at callsite registration. +// We use papaya here because registration rarely happens post-startup. +// papaya is good for read-heavy workloads. +// +// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy, +// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy. +type CallsiteMap = + papaya::HashMap>; + /// Implements tracing layer to handle events specific to logging. -struct JsonLoggingLayer { +struct JsonLoggingLayer { clock: C, - skipped_field_indices: papaya::HashMap, - callsite_ids: papaya::HashMap, writer: W, - // We use a const generic and arrays to bypass one heap allocation. - extract_fields: IndexSet<&'static str>, - _marker: std::marker::PhantomData<[&'static str; F]>, + + /// tracks which fields of each **event** are duplicates + skipped_field_indices: CallsiteMap, + + span_info: CallsiteMap, + + /// Fields we want to keep track of in a separate json object. + extract_fields: &'static [&'static str], } -impl JsonLoggingLayer { - fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self { +impl JsonLoggingLayer { + fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self { JsonLoggingLayer { clock, - skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + skipped_field_indices: CallsiteMap::default(), + span_info: CallsiteMap::default(), writer, - extract_fields: IndexSet::from_iter(extract_fields), - _marker: std::marker::PhantomData, + extract_fields, } } #[inline] - fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId { - *self - .callsite_ids + fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo { + self.span_info .pin() - .get_or_insert_with(cs, CallsiteId::next) + .get_or_insert_with(metadata.callsite(), || { + CallsiteSpanInfo::new(metadata, self.extract_fields) + }) + .clone() } } -impl Layer - for JsonLoggingLayer +impl Layer for JsonLoggingLayer where S: Subscriber + for<'a> LookupSpan<'a>, { @@ -237,35 +249,25 @@ where // early, before OTel machinery, and add as event extension. let now = self.clock.now(); - let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| { - if entered.get() { - let mut formatter = EventFormatter::new(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - } else { - entered.set(true); - defer!(entered.set(false);); + let res: io::Result<()> = EVENT_FORMATTER.with(|f| { + let mut borrow = f.try_borrow_mut(); + let formatter = match borrow.as_deref_mut() { + Ok(formatter) => formatter, + // If the thread local formatter is borrowed, + // then we likely hit an edge case were we panicked during formatting. + // We allow the logging to proceed with an uncached formatter. + Err(_) => &mut EventFormatter::new(), + }; - EVENT_FORMATTER.with_borrow_mut(move |formatter| { - formatter.reset(); - formatter.format::( - now, - event, - &ctx, - &self.skipped_field_indices, - &self.callsite_ids, - &self.extract_fields, - )?; - self.writer.make_writer().write_all(formatter.buffer()) - }) - } + formatter.reset(); + formatter.format( + now, + event, + &ctx, + &self.skipped_field_indices, + self.extract_fields, + )?; + self.writer.make_writer().write_all(formatter.buffer()) }); // In case logging fails we generate a simpler JSON object. @@ -287,50 +289,48 @@ where /// Registers a SpanFields instance as span extension. fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let fields = SpanFields::default(); - fields.record_fields(attrs); - // This could deadlock when there's a panic somewhere in the tracing - // event handling and a read or write guard is still held. This includes - // the OTel subscriber. - let mut exts = span.extensions_mut(); + let mut fields = SpanFields::new(self.span_info(span.metadata())); + attrs.record(&mut fields); - exts.insert(fields); + // This is a new span: the extensions should not be locked + // unless some layer spawned a thread to process this span. + // I don't think any layers do that. + span.extensions_mut().insert(fields); } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { let span = ctx.span(id).expect("span must exist"); - let ext = span.extensions(); - if let Some(data) = ext.get::() { - data.record_fields(values); + + // assumption: `on_record` is rarely called. + // assumption: a span being updated by one thread, + // and formatted by another thread is even rarer. + let mut ext = span.extensions_mut(); + if let Some(fields) = ext.get_mut::() { + values.record(fields); } } - /// Called (lazily) whenever a new log call is executed. We quickly check - /// for duplicate field names and record duplicates as skippable. Last one - /// wins. + /// Called (lazily) roughly once per event/span instance. We quickly check + /// for duplicate field names and record duplicates as skippable. Last field wins. fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + debug_assert!( + metadata.fields().len() <= MAX_TRACING_FIELDS, + "callsite {metadata:?} has too many fields." + ); + if !metadata.is_event() { - self.callsite_id(metadata.callsite()); + // register the span info. + self.span_info(metadata); // Must not be never because we wouldn't get trace and span data. return Interest::always(); } let mut field_indices = SkippedFieldIndices::default(); - let mut seen_fields = HashMap::<&'static str, usize>::new(); + let mut seen_fields = HashMap::new(); for field in metadata.fields() { - use std::collections::hash_map::Entry; - match seen_fields.entry(field.name()) { - Entry::Vacant(entry) => { - // field not seen yet - entry.insert(field.index()); - } - Entry::Occupied(mut entry) => { - // replace currently stored index - let old_index = entry.insert(field.index()); - // ... and append it to list of skippable indices - field_indices.push(old_index); - } + if let Some(old_index) = seen_fields.insert(field.name(), field.index()) { + field_indices.set(old_index); } } @@ -344,110 +344,113 @@ where } } -#[derive(Copy, Clone, Debug, Default)] -#[repr(transparent)] -struct CallsiteId(u32); +/// Any span info that is fixed to a particular callsite. Not variable between span instances. +#[derive(Clone)] +struct CallsiteSpanInfo { + /// index of each field to extract. usize::MAX if not found. + extract: Arc<[usize]>, -impl CallsiteId { - #[inline] - fn next() -> Self { - // Start at 1 to reserve 0 for default. - static COUNTER: AtomicU32 = AtomicU32::new(1); - CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed)) - } + /// tracks the fixed "callsite ID" for each span. + /// note: this is not stable between runs. + normalized_name: Arc, } -impl fmt::Display for CallsiteId { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl CallsiteSpanInfo { + fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self { + // Start at 1 to reserve 0 for default. + static COUNTER: AtomicU32 = AtomicU32::new(1); + + let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect(); + + // get all the indices of span fields we want to focus + let extract = extract_fields + .iter() + // use rposition, since we want last match wins. + .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX)) + .collect(); + + // normalized_name is unique for each callsite, but it is not + // unified across separate proxy instances. + // todo: can we do better here? + let cid = COUNTER.fetch_add(1, Ordering::Relaxed); + let normalized_name = format!("{}#{cid}", metadata.name()).into(); + + Self { + extract, + normalized_name, + } } } /// Stores span field values recorded during the spans lifetime. -#[derive(Default)] struct SpanFields { - // TODO: Switch to custom enum with lasso::Spur for Strings? - fields: papaya::HashMap<&'static str, serde_json::Value>, + values: [serde_json::Value; MAX_TRACING_FIELDS], + + /// cached span info so we can avoid extra hashmap lookups in the hot path. + span_info: CallsiteSpanInfo, } impl SpanFields { - #[inline] - fn record_fields(&self, fields: R) { - fields.record(&mut SpanFieldsRecorder { - fields: self.fields.pin(), - }); + fn new(span_info: CallsiteSpanInfo) -> Self { + Self { + span_info, + values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS], + } } } -/// Implements a tracing field visitor to convert and store values. -struct SpanFieldsRecorder<'m, S, G> { - fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>, -} - -impl tracing::field::Visit for SpanFieldsRecorder<'_, S, G> { +impl tracing::field::Visit for SpanFields { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { if let Ok(value) = i64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { if let Ok(value) = u64::try_from(value) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } else { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } #[inline] fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.fields - .insert(field.name(), serde_json::Value::from(value)); + self.values[field.index()] = serde_json::Value::from(value); } #[inline] fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value:?}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value:?}")); } #[inline] @@ -456,38 +459,33 @@ impl tracing::field::Visit for SpanFieldsRecor field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { - self.fields - .insert(field.name(), serde_json::Value::from(format!("{value}"))); + self.values[field.index()] = serde_json::Value::from(format!("{value}")); } } /// List of field indices skipped during logging. Can list duplicate fields or /// metafields not meant to be logged. -#[derive(Clone, Default)] +#[derive(Copy, Clone, Default)] struct SkippedFieldIndices { - bits: u64, + // 32-bits is large enough for `MAX_TRACING_FIELDS` + bits: u32, } impl SkippedFieldIndices { #[inline] - fn is_empty(&self) -> bool { + fn is_empty(self) -> bool { self.bits == 0 } #[inline] - fn push(&mut self, index: usize) { - self.bits |= 1u64 - .checked_shl(index as u32) - .expect("field index too large"); + fn set(&mut self, index: usize) { + debug_assert!(index <= 32, "index out of bounds of 32-bit set"); + self.bits |= 1 << index; } #[inline] - fn contains(&self, index: usize) -> bool { - self.bits - & 1u64 - .checked_shl(index as u32) - .expect("field index too large") - != 0 + fn contains(self, index: usize) -> bool { + self.bits & (1 << index) != 0 } } @@ -499,7 +497,7 @@ struct EventFormatter { impl EventFormatter { #[inline] - fn new() -> Self { + const fn new() -> Self { EventFormatter { logline_buffer: Vec::new(), } @@ -515,14 +513,13 @@ impl EventFormatter { self.logline_buffer.clear(); } - fn format( + fn format( &mut self, now: DateTime, event: &Event<'_>, ctx: &Context<'_, S>, - skipped_field_indices: &papaya::HashMap, - callsite_ids: &papaya::HashMap, - extract_fields: &IndexSet<&'static str>, + skipped_field_indices: &CallsiteMap, + extract_fields: &'static [&'static str], ) -> io::Result<()> where S: Subscriber + for<'a> LookupSpan<'a>, @@ -533,8 +530,11 @@ impl EventFormatter { let normalized_meta = event.normalized_metadata(); let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); - let skipped_field_indices = skipped_field_indices.pin(); - let skipped_field_indices = skipped_field_indices.get(&meta.callsite()); + let skipped_field_indices = skipped_field_indices + .pin() + .get(&meta.callsite()) + .copied() + .unwrap_or_default(); let mut serialize = || { let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); @@ -565,9 +565,11 @@ impl EventFormatter { } let spans = SerializableSpans { - ctx, - callsite_ids, - extract: ExtractedSpanFields::<'_, F>::new(extract_fields), + // collect all spans from parent to root. + spans: ctx + .event_span(event) + .map_or(vec![], |parent| parent.scope().collect()), + extracted: ExtractedSpanFields::new(extract_fields), }; serializer.serialize_entry("spans", &spans)?; @@ -620,9 +622,9 @@ impl EventFormatter { } } - if spans.extract.has_values() { + if spans.extracted.has_values() { // TODO: add fields from event, too? - serializer.serialize_entry("extract", &spans.extract)?; + serializer.serialize_entry("extract", &spans.extracted)?; } serializer.end() @@ -635,15 +637,15 @@ impl EventFormatter { } /// Extracts the message field that's mixed will other fields. -struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> { +struct MessageFieldExtractor { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Option>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { +impl MessageFieldExtractor { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { fn accept_field(&self, field: &tracing::field::Field) -> bool { self.state.is_none() && field.name() == MESSAGE_FIELD - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } } -impl tracing::field::Visit for MessageFieldExtractor<'_, S> { +impl tracing::field::Visit for MessageFieldExtractor { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -751,14 +751,14 @@ impl tracing::field::Visit for MessageFieldExtracto /// can be skipped. // This is entirely optional and only cosmetic, though maybe helps a // bit during log parsing in dashboards when there's no field with empty object. -struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>); +struct FieldsPresent(pub bool, SkippedFieldIndices); // Even though some methods have an overhead (error, bytes) it is assumed the // compiler won't include this since we ignore the value entirely. -impl tracing::field::Visit for FieldsPresent<'_> { +impl tracing::field::Visit for FieldsPresent { #[inline] fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) { - if !self.1.is_some_and(|i| i.contains(field.index())) + if !self.1.contains(field.index()) && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") { @@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> { } /// Serializes the fields directly supplied with a log event. -struct SerializableEventFields<'a, 'event>( - &'a tracing::Event<'event>, - Option<&'a SkippedFieldIndices>, -); +struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices); impl serde::ser::Serialize for SerializableEventFields<'_, '_> { fn serialize(&self, serializer: S) -> Result @@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> { } /// A tracing field visitor that skips the message field. -struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> { +struct MessageFieldSkipper { serializer: S, - skipped_field_indices: Option<&'a SkippedFieldIndices>, + skipped_field_indices: SkippedFieldIndices, state: Result<(), S::Error>, } -impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { +impl MessageFieldSkipper { #[inline] - fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self { Self { serializer, skipped_field_indices, @@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { self.state.is_ok() && field.name() != MESSAGE_FIELD && !field.name().starts_with("log.") - && !self - .skipped_field_indices - .is_some_and(|i| i.contains(field.index())) + && !self.skipped_field_indices.contains(field.index()) } #[inline] @@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { } } -impl tracing::field::Visit for MessageFieldSkipper<'_, S> { +impl tracing::field::Visit for MessageFieldSkipper { #[inline] fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { if self.accept_field(field) { @@ -905,18 +900,17 @@ impl tracing::field::Visit for MessageFieldSkipper< /// with the span names as keys. To prevent collision we append a numberic value /// to the name. Also, collects any span fields we're interested in. Last one /// wins. -struct SerializableSpans<'a, 'ctx, Span, const F: usize> +struct SerializableSpans<'ctx, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { - ctx: &'a Context<'ctx, Span>, - callsite_ids: &'a papaya::HashMap, - extract: ExtractedSpanFields<'a, F>, + spans: Vec>, + extracted: ExtractedSpanFields, } -impl serde::ser::Serialize for SerializableSpans<'_, '_, Span, F> +impl serde::ser::Serialize for SerializableSpans<'_, S> where - Span: Subscriber + for<'lookup> LookupSpan<'lookup>, + S: for<'lookup> LookupSpan<'lookup>, { fn serialize(&self, serializer: Ser) -> Result where @@ -924,25 +918,22 @@ where { let mut serializer = serializer.serialize_map(None)?; - if let Some(leaf_span) = self.ctx.lookup_current() { - for span in leaf_span.scope().from_root() { - // Append a numeric callsite ID to the span name to keep the name unique - // in the JSON object. - let cid = self - .callsite_ids - .pin() - .get(&span.metadata().callsite()) - .copied() - .unwrap_or_default(); + for span in self.spans.iter().rev() { + let ext = span.extensions(); - // Loki turns the # into an underscore during field name concatenation. - serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?; + // all spans should have this extension. + let Some(fields) = ext.get() else { continue }; - serializer.serialize_value(&SerializableSpanFields { - span: &span, - extract: &self.extract, - })?; - } + self.extracted.layer_span(fields); + + let SpanFields { values, span_info } = fields; + serializer.serialize_entry( + &*span_info.normalized_name, + &SerializableSpanFields { + fields: span.metadata().fields(), + values, + }, + )?; } serializer.end() @@ -950,80 +941,77 @@ where } /// Serializes the span fields as object. -struct SerializableSpanFields<'a, 'span, Span, const F: usize> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ - span: &'a SpanRef<'span, Span>, - extract: &'a ExtractedSpanFields<'a, F>, +struct SerializableSpanFields<'span> { + fields: &'span tracing::field::FieldSet, + values: &'span [serde_json::Value; MAX_TRACING_FIELDS], } -impl serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F> -where - Span: for<'lookup> LookupSpan<'lookup>, -{ +impl serde::ser::Serialize for SerializableSpanFields<'_> { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let ext = self.span.extensions(); - if let Some(data) = ext.get::() { - for (name, value) in &data.fields.pin() { - serializer.serialize_entry(name, value)?; - // TODO: replace clone with reference, if possible. - self.extract.set(name, value.clone()); + for (field, value) in std::iter::zip(self.fields, self.values) { + if value.is_null() { + continue; } + serializer.serialize_entry(field.name(), value)?; } serializer.end() } } -struct ExtractedSpanFields<'a, const F: usize> { - names: &'a IndexSet<&'static str>, - // TODO: replace TryLock with something local thread and interior mutability. - // serde API doesn't let us use `mut`. - values: TryLock<([Option; F], bool)>, +struct ExtractedSpanFields { + names: &'static [&'static str], + values: RefCell>, } -impl<'a, const F: usize> ExtractedSpanFields<'a, F> { - fn new(names: &'a IndexSet<&'static str>) -> Self { +impl ExtractedSpanFields { + fn new(names: &'static [&'static str]) -> Self { ExtractedSpanFields { names, - values: TryLock::new((array::from_fn(|_| Option::default()), false)), + values: RefCell::new(vec![serde_json::Value::Null; names.len()]), } } - #[inline] - fn set(&self, name: &'static str, value: serde_json::Value) { - if let Some((index, _)) = self.names.get_full(name) { - let mut fields = self.values.try_lock().expect("thread-local use"); - fields.0[index] = Some(value); - fields.1 = true; + fn layer_span(&self, fields: &SpanFields) { + let mut v = self.values.borrow_mut(); + let SpanFields { values, span_info } = fields; + + // extract the fields + for (i, &j) in span_info.extract.iter().enumerate() { + let Some(value) = values.get(j) else { continue }; + + if !value.is_null() { + // TODO: replace clone with reference, if possible. + v[i] = value.clone(); + } } } #[inline] fn has_values(&self) -> bool { - self.values.try_lock().expect("thread-local use").1 + self.values.borrow().iter().any(|v| !v.is_null()) } } -impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { +impl serde::ser::Serialize for ExtractedSpanFields { fn serialize(&self, serializer: S) -> Result where S: serde::ser::Serializer, { let mut serializer = serializer.serialize_map(None)?; - let values = self.values.try_lock().expect("thread-local use"); - for (i, value) in values.0.iter().enumerate() { - if let Some(value) = value { - let key = self.names[i]; - serializer.serialize_entry(key, value)?; + let values = self.values.borrow(); + for (key, value) in std::iter::zip(self.names, &*values) { + if value.is_null() { + continue; } + + serializer.serialize_entry(key, value)?; } serializer.end() @@ -1032,7 +1020,6 @@ impl serde::ser::Serialize for ExtractedSpanFields<'_, F> { #[cfg(test)] mod tests { - use std::marker::PhantomData; use std::sync::{Arc, Mutex, MutexGuard}; use assert_json_diff::assert_json_eq; @@ -1081,10 +1068,9 @@ mod tests { let log_layer = JsonLoggingLayer { clock: clock.clone(), skipped_field_indices: papaya::HashMap::default(), - callsite_ids: papaya::HashMap::default(), + span_info: papaya::HashMap::default(), writer: buffer.clone(), - extract_fields: IndexSet::from_iter(["x"]), - _marker: PhantomData::<[&'static str; 1]>, + extract_fields: &["x"], }; let registry = tracing_subscriber::Registry::default().with(log_layer);