diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 65f8ddaab4..024e66d112 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -36,6 +36,8 @@ use tracing::{debug, trace, warn}; use utils::bin_ser::DeserializeError; use utils::{bin_ser::BeSer, lsn::Lsn}; +const MAX_AUX_FILE_DELTAS: usize = 1024; + #[derive(Debug)] pub enum LsnForTimestamp { /// Found commits both before and after the given timestamp @@ -157,7 +159,6 @@ impl Timeline { pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, - pending_aux_files: None, pending_directory_entries: Vec::new(), lsn, } @@ -873,11 +874,6 @@ pub struct DatadirModification<'a> { pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, - // If we already wrote any aux file changes in this modification, stash the latest dir. If set, - // [`Self::put_file`] may assume that it is safe to emit a delta rather than checking - // if AUX_FILES_KEY is already set. - pending_aux_files: Option, - /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, @@ -1401,19 +1397,28 @@ impl<'a> DatadirModification<'a> { Some(Bytes::copy_from_slice(content)) }; - let dir = if let Some(mut dir) = self.pending_aux_files.take() { + let n_files; + let mut aux_files = self.tline.aux_files.lock().await; + if let Some(mut dir) = aux_files.dir.take() { // We already updated aux files in `self`: emit a delta and update our latest value - - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { - file_path: file_path.clone(), - content: content.clone(), - }), - ); - - dir.upsert(file_path, content); - dir + dir.upsert(file_path.clone(), content.clone()); + n_files = dir.files.len(); + if aux_files.n_deltas == MAX_AUX_FILE_DELTAS { + self.put( + AUX_FILES_KEY, + Value::Image(Bytes::from( + AuxFilesDirectory::ser(&dir).context("serialize")?, + )), + ); + aux_files.n_deltas = 0; + } else { + self.put( + AUX_FILES_KEY, + Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }), + ); + aux_files.n_deltas += 1; + } + aux_files.dir = Some(dir); } else { // Check if the AUX_FILES_KEY is initialized match self.get(AUX_FILES_KEY, ctx).await { @@ -1428,7 +1433,8 @@ impl<'a> DatadirModification<'a> { }), ); dir.upsert(file_path, content); - dir + n_files = dir.files.len(); + aux_files.dir = Some(dir); } Err( e @ (PageReconstructError::AncestorStopping(_) @@ -1455,14 +1461,14 @@ impl<'a> DatadirModification<'a> { AuxFilesDirectory::ser(&dir).context("serialize")?, )), ); - dir + n_files = 1; + aux_files.dir = Some(dir); } } - }; + } self.pending_directory_entries - .push((DirectoryKind::AuxFiles, dir.files.len())); - self.pending_aux_files = Some(dir); + .push((DirectoryKind::AuxFiles, n_files)); Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 59a7dcd4bd..b94ad5760a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -54,7 +54,7 @@ use std::{ ops::ControlFlow, }; -use crate::pgdatadir_mapping::DirectoryKind; +use crate::pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind}; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, @@ -174,6 +174,11 @@ pub struct TimelineResources { >, } +pub(crate) struct AuxFilesState { + pub(crate) dir: Option, + pub(crate) n_deltas: usize, +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -357,6 +362,9 @@ pub struct Timeline { timeline_get_throttle: Arc< crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, >, + + /// Keep aux directory cache to avoid it's reconstruction on each update + pub(crate) aux_files: tokio::sync::Mutex, } pub struct WalReceiverInfo { @@ -1693,6 +1701,11 @@ impl Timeline { gc_lock: tokio::sync::Mutex::default(), timeline_get_throttle: resources.timeline_get_throttle, + + aux_files: tokio::sync::Mutex::new(AuxFilesState { + dir: None, + n_deltas: 0, + }), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; diff --git a/test_runner/regress/test_layer_bloating.py b/test_runner/regress/test_layer_bloating.py index bf5834b665..2fdee89389 100644 --- a/test_runner/regress/test_layer_bloating.py +++ b/test_runner/regress/test_layer_bloating.py @@ -6,6 +6,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, logical_replication_sync, + wait_for_last_flush_lsn, ) from fixtures.pg_version import PgVersion @@ -52,6 +53,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg): cur.execute("select create_snapshots(10000)") # Wait logical replication to sync logical_replication_sync(vanilla_pg, endpoint) + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline) time.sleep(10) # Check layer file sizes