From 160e52ec7e70213cc0e87843886c1e2204bdf60d Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Tue, 19 Jul 2022 14:56:25 -0400 Subject: [PATCH] Optimize branch creation (#2101) Resolves #2054 **Context**: branch creation needs to wait for GC to acquire `gc_cs` lock, which prevents creating new timelines during GC. However, because individual timeline GC iteration also requires `compaction_cs` lock, branch creation may also need to wait for compactions of multiple timelines. This results in large latency when creating a new branch, which we advertised as *"instantly"*. This PR optimizes the latency of branch creation by separating GC into two phases: 1. Collect GC data (branching points, cutoff LSNs, etc) 2. Perform GC for each timeline The GC bottleneck comes from step 2, which must wait for compaction of multiple timelines. This PR modifies the branch creation and GC functions to allow GC to hold the GC lock only in step 1. As a result, branch creation doesn't need to wait for compaction to finish but only needs to wait for GC data collection step, which is fast. --- .github/workflows/build_and_test.yml | 6 +- .github/workflows/codestyle.yml | 2 +- libs/postgres_ffi/build.rs | 10 +- pageserver/src/layered_repository.rs | 291 +++++++++++------- .../batch_others/test_branch_and_gc.py | 66 ++++ .../performance/test_branch_creation.py | 110 +++++++ 6 files changed, 359 insertions(+), 126 deletions(-) create mode 100644 test_runner/performance/test_branch_creation.py diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 3fecb2bf67..5874aa9b5c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -81,8 +81,8 @@ jobs: target/ # Fall back to older versions of the key, if no cache for current Cargo.lock was found key: | - v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} - v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}- + v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}- - name: Cache postgres build id: cache_pg @@ -268,7 +268,7 @@ jobs: !~/.cargo/registry/src ~/.cargo/git/ target/ - key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} - name: Get Neon artifact for restoration uses: actions/download-artifact@v3 diff --git a/.github/workflows/codestyle.yml b/.github/workflows/codestyle.yml index 89bfffd4b9..8bcaa8f947 100644 --- a/.github/workflows/codestyle.yml +++ b/.github/workflows/codestyle.yml @@ -101,7 +101,7 @@ jobs: !~/.cargo/registry/src ~/.cargo/git target - key: ${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }} + key: v1-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }} - name: Run cargo clippy run: ./run_clippy.sh diff --git a/libs/postgres_ffi/build.rs b/libs/postgres_ffi/build.rs index c6df4fc0b0..7db2c20e34 100644 --- a/libs/postgres_ffi/build.rs +++ b/libs/postgres_ffi/build.rs @@ -49,12 +49,12 @@ fn main() { // Finding the location of C headers for the Postgres server: // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/tmp_install` // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/tmp_install/include/postgresql/server` - let mut pg_install_dir: PathBuf; - if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { - pg_install_dir = postgres_install_dir.into(); + let mut pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") + { + postgres_install_dir.into() } else { - pg_install_dir = PathBuf::from("tmp_install") - } + PathBuf::from("tmp_install") + }; if pg_install_dir.is_relative() { let cwd = env::current_dir().unwrap(); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6459e802f4..93acce912c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -281,12 +281,22 @@ impl Repository for LayeredRepository { // concurrently removes data that is needed by the new timeline. let _gc_cs = self.gc_cs.lock().unwrap(); + // In order for the branch creation task to not wait for GC/compaction, + // we need to make sure that the starting LSN of the child branch is not out of scope midway by + // + // 1. holding the GC lock to prevent overwritting timeline's GC data + // 2. checking both the latest GC cutoff LSN and latest GC info of the source timeline + // + // Step 2 is to avoid initializing the new branch using data removed by past GC iterations + // or in-queue GC iterations. + let mut timelines = self.timelines.lock().unwrap(); let src_timeline = self .get_timeline_load_internal(src, &mut timelines) // message about timeline being remote is one .context up in the stack .context("failed to load timeline for branching")? .ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?; + let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn(); // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN @@ -296,9 +306,23 @@ impl Repository for LayeredRepository { lsn }); + // Check if the starting LSN is out of scope because it is less than + // 1. the latest GC cutoff LSN or + // 2. the planned GC cutoff LSN, which is from an in-queue GC iteration. src_timeline .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) - .context("invalid branch start lsn")?; + .context(format!( + "invalid branch start lsn: less than latest GC cutoff {latest_gc_cutoff_lsn}" + ))?; + { + let gc_info = src_timeline.gc_info.read().unwrap(); + let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff); + if start_lsn < cutoff { + bail!(format!( + "invalid branch start lsn: less than planned GC cutoff {cutoff}" + )); + } + } // Determine prev-LSN for the new timeline. We can only determine it if // the timeline was branched at the current end of the source timeline. @@ -440,13 +464,7 @@ impl Repository for LayeredRepository { Entry::Vacant(_) => bail!("timeline not found"), }; - // try to acquire gc and compaction locks to prevent errors from missing files - let _gc_guard = self - .gc_cs - .try_lock() - .map_err(|e| anyhow::anyhow!("cannot acquire gc lock {e}"))?; - - let compaction_guard = timeline_entry.get().compaction_guard()?; + let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -457,7 +475,7 @@ impl Repository for LayeredRepository { })?; info!("detach removed files"); - drop(compaction_guard); + drop(layer_removal_guard); timeline_entry.remove(); Ok(()) @@ -524,10 +542,10 @@ impl LayeredTimelineEntry { } } - fn compaction_guard(&self) -> Result>, anyhow::Error> { + fn layer_removal_guard(&self) -> Result>, anyhow::Error> { match self { LayeredTimelineEntry::Loaded(timeline) => timeline - .compaction_cs + .layer_removal_cs .try_lock() .map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}")) .map(Some), @@ -883,50 +901,50 @@ impl LayeredRepository { let now = Instant::now(); // grab mutex to prevent new timelines from being created here. - let _gc_cs = self.gc_cs.lock().unwrap(); + let gc_cs = self.gc_cs.lock().unwrap(); + + let mut timelines = self.timelines.lock().unwrap(); // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); - let mut timeline_ids = Vec::new(); - let mut timelines = self.timelines.lock().unwrap(); + let timeline_ids = { + if let Some(target_timeline_id) = target_timeline_id.as_ref() { + if timelines.get(target_timeline_id).is_none() { + bail!("gc target timeline does not exist") + } + }; - if let Some(target_timeline_id) = target_timeline_id.as_ref() { - if timelines.get(target_timeline_id).is_none() { - bail!("gc target timeline does not exist") - } + timelines + .iter() + .map(|(timeline_id, timeline_entry)| { + // This is unresolved question for now, how to do gc in presence of remote timelines + // especially when this is combined with branching. + // Somewhat related: https://github.com/zenithdb/zenith/issues/999 + if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { + // If target_timeline is specified, we only need to know branchpoints of its children + if let Some(timelineid) = target_timeline_id { + if ancestor_timeline_id == &timelineid { + all_branchpoints + .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + } + } + // Collect branchpoints for all timelines + else { + all_branchpoints + .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + } + } + + *timeline_id + }) + .collect::>() }; - for (timeline_id, timeline_entry) in timelines.iter() { - timeline_ids.push(*timeline_id); - - // This is unresolved question for now, how to do gc in presence of remote timelines - // especially when this is combined with branching. - // Somewhat related: https://github.com/neondatabase/neon/issues/999 - if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { - // If target_timeline is specified, we only need to know branchpoints of its children - if let Some(timelineid) = target_timeline_id { - if ancestor_timeline_id == &timelineid { - all_branchpoints - .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); - } - } - // Collect branchpoints for all timelines - else { - all_branchpoints.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); - } - } - } - // Ok, we now know all the branch points. - // Perform GC for each timeline. - for timeline_id in timeline_ids.into_iter() { - if thread_mgr::is_shutdown_requested() { - // We were requested to shut down. Stop and return with the progress we - // made. - break; - } - + // Update the GC information for each timeline. + let mut gc_timelines = Vec::with_capacity(timeline_ids.len()); + for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self .get_timeline_load_internal(timeline_id, &mut *timelines)? @@ -940,7 +958,6 @@ impl LayeredRepository { } if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) { - drop(timelines); let branchpoints: Vec = all_branchpoints .range(( Included((timeline_id, Lsn(0))), @@ -948,21 +965,45 @@ impl LayeredRepository { )) .map(|&x| x.1) .collect(); + timeline.update_gc_info(branchpoints, cutoff, pitr)?; - // If requested, force flush all in-memory layers to disk first, - // so that they too can be garbage collected. That's - // used in tests, so we want as deterministic results as possible. - if checkpoint_before_gc { - timeline.checkpoint(CheckpointConfig::Forced)?; - info!("timeline {} checkpoint_before_gc done", timeline_id); - } - timeline.update_gc_info(branchpoints, cutoff, pitr); - let result = timeline.gc()?; - - totals += result; - timelines = self.timelines.lock().unwrap(); + gc_timelines.push(timeline); } } + drop(timelines); + drop(gc_cs); + + // Perform GC for each timeline. + // + // Note that we don't hold the GC lock here because we don't want + // to delay the branch creation task, which requires the GC lock. + // A timeline GC iteration can be slow because it may need to wait for + // compaction (both require `layer_removal_cs` lock), + // but the GC iteration can run concurrently with branch creation. + // + // See comments in [`LayeredRepository::branch_timeline`] for more information + // about why branch creation task can run concurrently with timeline's GC iteration. + for timeline in gc_timelines { + if thread_mgr::is_shutdown_requested() { + // We were requested to shut down. Stop and return with the progress we + // made. + break; + } + + // If requested, force flush all in-memory layers to disk first, + // so that they too can be garbage collected. That's + // used in tests, so we want as deterministic results as possible. + if checkpoint_before_gc { + timeline.checkpoint(CheckpointConfig::Forced)?; + info!( + "timeline {} checkpoint_before_gc done", + timeline.timeline_id + ); + } + + let result = timeline.gc()?; + totals += result; + } totals.elapsed = now.elapsed(); Ok(totals) @@ -1038,11 +1079,11 @@ pub struct LayeredTimeline { /// Used to ensure that there is only one thread layer_flush_lock: Mutex<()>, - // Prevent concurrent compactions. - // Compactions are normally performed by one thread. But compaction can also be manually - // requested by admin (that's used in tests). These forced compactions run in a different - // thread and could be triggered at the same time as a normal, timed compaction. - compaction_cs: Mutex<()>, + /// Layer removal lock. + /// A lock to ensure that no layer of the timeline is removed concurrently by other threads. + /// This lock is acquired in [`LayeredTimeline::gc`], [`LayeredTimeline::compact`], + /// and [`LayeredRepository::delete_timeline`]. + layer_removal_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected latest_gc_cutoff_lsn: RwLock, @@ -1079,12 +1120,14 @@ struct GcInfo { /// last-record LSN /// /// FIXME: is this inclusive or exclusive? - cutoff: Lsn, + horizon_cutoff: Lsn, - /// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()' - /// minus 'pitr_interval' + /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this + /// point. /// - pitr: Duration, + /// This is calculated by finding a number such that a record is needed for PITR + /// if only if its LSN is larger than 'pitr_cutoff'. + pitr_cutoff: Lsn, } /// Public interface functions @@ -1324,12 +1367,12 @@ impl LayeredTimeline { write_lock: Mutex::new(()), layer_flush_lock: Mutex::new(()), - compaction_cs: Mutex::new(()), + layer_removal_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { retain_lsns: Vec::new(), - cutoff: Lsn(0), - pitr: Duration::ZERO, + horizon_cutoff: Lsn(0), + pitr_cutoff: Lsn(0), }), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), @@ -1950,7 +1993,7 @@ impl LayeredTimeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let _compaction_cs = self.compaction_cs.lock().unwrap(); + let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); let target_file_size = self.get_checkpoint_distance(); @@ -2267,46 +2310,34 @@ impl LayeredTimeline { /// TODO: that's wishful thinking, compaction doesn't actually do that /// currently. /// - /// The caller specifies how much history is needed with the two arguments: + /// The caller specifies how much history is needed with the 3 arguments: /// /// retain_lsns: keep a version of each page at these LSNs - /// cutoff: also keep everything newer than this LSN + /// cutoff_horizon: also keep everything newer than this LSN + /// pitr: the time duration required to keep data for PITR /// /// The 'retain_lsns' list is currently used to prevent removing files that /// are needed by child timelines. In the future, the user might be able to /// name additional points in time to retain. The caller is responsible for /// collecting that information. /// - /// The 'cutoff' point is used to retain recent versions that might still be + /// The 'cutoff_horizon' point is used to retain recent versions that might still be /// needed by read-only nodes. (As of this writing, the caller just passes /// the latest LSN subtracted by a constant, and doesn't do anything smart /// to figure out what read-only nodes might actually need.) /// - fn update_gc_info(&self, retain_lsns: Vec, cutoff: Lsn, pitr: Duration) { + /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine + /// whether a record is needed for PITR. + fn update_gc_info( + &self, + retain_lsns: Vec, + cutoff_horizon: Lsn, + pitr: Duration, + ) -> Result<()> { let mut gc_info = self.gc_info.write().unwrap(); + + gc_info.horizon_cutoff = cutoff_horizon; gc_info.retain_lsns = retain_lsns; - gc_info.cutoff = cutoff; - gc_info.pitr = pitr; - } - - /// - /// Garbage collect layer files on a timeline that are no longer needed. - /// - /// Currently, we don't make any attempt at removing unneeded page versions - /// within a layer file. We can only remove the whole file if it's fully - /// obsolete. - /// - fn gc(&self) -> Result { - let now = SystemTime::now(); - let mut result: GcResult = Default::default(); - let disk_consistent_lsn = self.get_disk_consistent_lsn(); - - let _compaction_cs = self.compaction_cs.lock().unwrap(); - - let gc_info = self.gc_info.read().unwrap(); - let retain_lsns = &gc_info.retain_lsns; - let cutoff = min(gc_info.cutoff, disk_consistent_lsn); - let pitr = gc_info.pitr; // Calculate pitr cutoff point. // If we cannot determine a cutoff LSN, be conservative and don't GC anything. @@ -2315,6 +2346,7 @@ impl LayeredTimeline { if let Ok(timeline) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) { + let now = SystemTime::now(); // First, calculate pitr_cutoff_timestamp and then convert it to LSN. // If we don't have enough data to convert to LSN, // play safe and don't remove any layers. @@ -2325,7 +2357,7 @@ impl LayeredTimeline { LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, LsnForTimestamp::Future(lsn) => { debug!("future({})", lsn); - pitr_cutoff_lsn = cutoff; + pitr_cutoff_lsn = gc_info.horizon_cutoff; } LsnForTimestamp::Past(lsn) => { debug!("past({})", lsn); @@ -2339,22 +2371,47 @@ impl LayeredTimeline { } else if cfg!(test) { // We don't have local timeline in mocked cargo tests. // So, just ignore pitr_interval setting in this case. - pitr_cutoff_lsn = cutoff; + pitr_cutoff_lsn = gc_info.horizon_cutoff; } + gc_info.pitr_cutoff = pitr_cutoff_lsn; - let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn); + Ok(()) + } + + /// + /// Garbage collect layer files on a timeline that are no longer needed. + /// + /// Currently, we don't make any attempt at removing unneeded page versions + /// within a layer file. We can only remove the whole file if it's fully + /// obsolete. + /// + fn gc(&self) -> Result { + let mut result: GcResult = Default::default(); + let now = SystemTime::now(); + + fail_point!("before-timeline-gc"); + + let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); + + let gc_info = self.gc_info.read().unwrap(); + + let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn()); + let pitr_cutoff = gc_info.pitr_cutoff; + let retain_lsns = &gc_info.retain_lsns; + + let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); // Nothing to GC. Return early. - if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff { + let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); + if latest_gc_cutoff >= new_gc_cutoff { info!( - "Nothing to GC for timeline {}. cutoff_lsn {}", - self.timeline_id, new_gc_cutoff + "Nothing to GC for timeline {}: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}", + self.timeline_id ); - result.elapsed = now.elapsed()?; return Ok(result); } - let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered(); + let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %new_gc_cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. @@ -2388,23 +2445,23 @@ impl LayeredTimeline { result.layers_total += 1; - // 1. Is it newer than cutoff point? - if l.get_lsn_range().end > cutoff { + // 1. Is it newer than GC horizon cutoff point? + if l.get_lsn_range().end > horizon_cutoff { debug!( - "keeping {} because it's newer than cutoff {}", + "keeping {} because it's newer than horizon_cutoff {}", l.filename().display(), - cutoff + horizon_cutoff ); result.layers_needed_by_cutoff += 1; continue 'outer; } // 2. It is newer than PiTR cutoff point? - if l.get_lsn_range().end > pitr_cutoff_lsn { + if l.get_lsn_range().end > pitr_cutoff { debug!( - "keeping {} because it's newer than pitr_cutoff_lsn {}", + "keeping {} because it's newer than pitr_cutoff {}", l.filename().display(), - pitr_cutoff_lsn + pitr_cutoff ); result.layers_needed_by_pitr += 1; continue 'outer; @@ -2823,7 +2880,7 @@ pub mod tests { let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2893,7 +2950,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2970,7 +3027,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; diff --git a/test_runner/batch_others/test_branch_and_gc.py b/test_runner/batch_others/test_branch_and_gc.py index a6210b9176..7157386ce2 100644 --- a/test_runner/batch_others/test_branch_and_gc.py +++ b/test_runner/batch_others/test_branch_and_gc.py @@ -1,3 +1,5 @@ +import threading +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv from fixtures.utils import lsn_from_hex @@ -99,3 +101,67 @@ def test_branch_and_gc(neon_simple_env: NeonEnv): branch_cur.execute('SELECT count(*) FROM foo') assert branch_cur.fetchone() == (200000, ) + + +# This test simulates a race condition happening when branch creation and GC are performed concurrently. +# +# Suppose we want to create a new timeline 't' from a source timeline 's' starting +# from a lsn 'lsn'. Upon creating 't', if we don't hold the GC lock and compare 'lsn' with +# the latest GC information carefully, it's possible for GC to accidentally remove data +# needed by the new timeline. +# +# In this test, GC is requested before the branch creation but is delayed to happen after branch creation. +# As a result, when doing GC for the source timeline, we don't have any information about +# the upcoming new branches, so it's possible to remove data that may be needed by the new branches. +# It's the branch creation task's job to make sure the starting 'lsn' is not out of scope +# and prevent creating branches with invalid starting LSNs. +# +# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447. +def test_branch_creation_before_gc(neon_simple_env: NeonEnv): + env = neon_simple_env + # Disable background GC but set the `pitr_interval` to be small, so GC can delete something + tenant, _ = env.neon_cli.create_tenant( + conf={ + # disable background GC + 'gc_period': '10 m', + 'gc_horizon': f'{10 * 1024 ** 3}', + + # small checkpoint distance to create more delta layer files + 'checkpoint_distance': f'{1024 ** 2}', + + # set the target size to be large to allow the image layer to cover the whole key space + 'compaction_target_size': f'{1024 ** 3}', + + # tweak the default settings to allow quickly create image layers and L1 layers + 'compaction_period': '1 s', + 'compaction_threshold': '2', + 'image_creation_threshold': '1', + + # set PITR interval to be small, so we can do GC + 'pitr_interval': '1 s' + }) + + b0 = env.neon_cli.create_branch('b0', tenant_id=tenant) + pg0 = env.postgres.create_start('b0', tenant_id=tenant) + res = pg0.safe_psql_many(queries=[ + "CREATE TABLE t(key serial primary key)", + "INSERT INTO t SELECT FROM generate_series(1, 100000)", + "SELECT pg_current_wal_insert_lsn()", + "INSERT INTO t SELECT FROM generate_series(1, 100000)", + ]) + lsn = res[2][0][0] + + # Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the + # branch creation task but the individual timeline GC iteration happens *after* + # the branch creation task. + env.pageserver.safe_psql(f"failpoints before-timeline-gc=sleep(2000)") + + def do_gc(): + env.pageserver.safe_psql(f"do_gc {tenant.hex} {b0.hex} 0") + + thread = threading.Thread(target=do_gc, daemon=True) + thread.start() + + # The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC. + with pytest.raises(Exception, match="invalid branch start lsn"): + env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn) diff --git a/test_runner/performance/test_branch_creation.py b/test_runner/performance/test_branch_creation.py new file mode 100644 index 0000000000..1d39b0830d --- /dev/null +++ b/test_runner/performance/test_branch_creation.py @@ -0,0 +1,110 @@ +import random +import time +import statistics +import threading +import timeit +import pytest +from typing import List +from fixtures.benchmark_fixture import MetricReport +from fixtures.compare_fixtures import NeonCompare +from fixtures.log_helper import log + + +def _record_branch_creation_durations(neon_compare: NeonCompare, durs: List[float]): + neon_compare.zenbenchmark.record("branch_creation_duration_max", + max(durs), + 's', + MetricReport.LOWER_IS_BETTER) + neon_compare.zenbenchmark.record("branch_creation_duration_avg", + statistics.mean(durs), + 's', + MetricReport.LOWER_IS_BETTER) + neon_compare.zenbenchmark.record("branch_creation_duration_stdev", + statistics.stdev(durs), + 's', + MetricReport.LOWER_IS_BETTER) + + +@pytest.mark.parametrize("n_branches", [20]) +# Test measures the latency of branch creation during a heavy [1] workload. +# +# [1]: to simulate a heavy workload, the test tweaks the GC and compaction settings +# to increase the task's frequency. The test runs `pgbench` in each new branch. +# Each branch is created from a randomly picked source branch. +def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int): + env = neon_compare.env + pg_bin = neon_compare.pg_bin + + # Use aggressive GC and checkpoint settings, so GC and compaction happen more often during the test + tenant, _ = env.neon_cli.create_tenant( + conf={ + 'gc_period': '5 s', + 'gc_horizon': f'{4 * 1024 ** 2}', + 'checkpoint_distance': f'{2 * 1024 ** 2}', + 'compaction_target_size': f'{1024 ** 2}', + 'compaction_threshold': '2', + # set PITR interval to be small, so we can do GC + 'pitr_interval': '5 s' + }) + + def run_pgbench(branch: str): + log.info(f"Start a pgbench workload on branch {branch}") + + pg = env.postgres.create_start(branch, tenant_id=tenant) + connstr = pg.connstr() + + pg_bin.run_capture(['pgbench', '-i', connstr]) + pg_bin.run_capture(['pgbench', '-c10', '-T10', connstr]) + + pg.stop() + + env.neon_cli.create_branch('b0', tenant_id=tenant) + + threads: List[threading.Thread] = [] + threads.append(threading.Thread(target=run_pgbench, args=('b0', ), daemon=True)) + threads[-1].start() + + branch_creation_durations = [] + for i in range(n_branches): + time.sleep(1.0) + + # random a source branch + p = random.randint(0, i) + + timer = timeit.default_timer() + env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p), tenant_id=tenant) + dur = timeit.default_timer() - timer + + log.info(f"Creating branch b{i+1} took {dur}s") + branch_creation_durations.append(dur) + + threads.append(threading.Thread(target=run_pgbench, args=(f'b{i+1}', ), daemon=True)) + threads[-1].start() + + for thread in threads: + thread.join() + + _record_branch_creation_durations(neon_compare, branch_creation_durations) + + +@pytest.mark.parametrize("n_branches", [1024]) +# Test measures the latency of branch creation when creating a lot of branches. +def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int): + env = neon_compare.env + + env.neon_cli.create_branch('b0') + + pg = env.postgres.create_start('b0') + neon_compare.pg_bin.run_capture(['pgbench', '-i', '-s10', pg.connstr()]) + + branch_creation_durations = [] + + for i in range(n_branches): + # random a source branch + p = random.randint(0, i) + timer = timeit.default_timer() + env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p)) + dur = timeit.default_timer() - timer + branch_creation_durations.append(dur) + + _record_branch_creation_durations(neon_compare, branch_creation_durations)