diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 3fecb2bf67..5874aa9b5c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -81,8 +81,8 @@ jobs: target/ # Fall back to older versions of the key, if no cache for current Cargo.lock was found key: | - v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} - v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}- + v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}- - name: Cache postgres build id: cache_pg @@ -268,7 +268,7 @@ jobs: !~/.cargo/registry/src ~/.cargo/git/ target/ - key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} + key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} - name: Get Neon artifact for restoration uses: actions/download-artifact@v3 diff --git a/.github/workflows/codestyle.yml b/.github/workflows/codestyle.yml index 89bfffd4b9..8bcaa8f947 100644 --- a/.github/workflows/codestyle.yml +++ b/.github/workflows/codestyle.yml @@ -101,7 +101,7 @@ jobs: !~/.cargo/registry/src ~/.cargo/git target - key: ${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }} + key: v1-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }} - name: Run cargo clippy run: ./run_clippy.sh diff --git a/libs/postgres_ffi/build.rs b/libs/postgres_ffi/build.rs index c6df4fc0b0..7db2c20e34 100644 --- a/libs/postgres_ffi/build.rs +++ b/libs/postgres_ffi/build.rs @@ -49,12 +49,12 @@ fn main() { // Finding the location of C headers for the Postgres server: // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/tmp_install` // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/tmp_install/include/postgresql/server` - let mut pg_install_dir: PathBuf; - if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { - pg_install_dir = postgres_install_dir.into(); + let mut pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") + { + postgres_install_dir.into() } else { - pg_install_dir = PathBuf::from("tmp_install") - } + PathBuf::from("tmp_install") + }; if pg_install_dir.is_relative() { let cwd = env::current_dir().unwrap(); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6459e802f4..93acce912c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -281,12 +281,22 @@ impl Repository for LayeredRepository { // concurrently removes data that is needed by the new timeline. let _gc_cs = self.gc_cs.lock().unwrap(); + // In order for the branch creation task to not wait for GC/compaction, + // we need to make sure that the starting LSN of the child branch is not out of scope midway by + // + // 1. holding the GC lock to prevent overwritting timeline's GC data + // 2. checking both the latest GC cutoff LSN and latest GC info of the source timeline + // + // Step 2 is to avoid initializing the new branch using data removed by past GC iterations + // or in-queue GC iterations. + let mut timelines = self.timelines.lock().unwrap(); let src_timeline = self .get_timeline_load_internal(src, &mut timelines) // message about timeline being remote is one .context up in the stack .context("failed to load timeline for branching")? .ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?; + let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn(); // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN @@ -296,9 +306,23 @@ impl Repository for LayeredRepository { lsn }); + // Check if the starting LSN is out of scope because it is less than + // 1. the latest GC cutoff LSN or + // 2. the planned GC cutoff LSN, which is from an in-queue GC iteration. src_timeline .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) - .context("invalid branch start lsn")?; + .context(format!( + "invalid branch start lsn: less than latest GC cutoff {latest_gc_cutoff_lsn}" + ))?; + { + let gc_info = src_timeline.gc_info.read().unwrap(); + let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff); + if start_lsn < cutoff { + bail!(format!( + "invalid branch start lsn: less than planned GC cutoff {cutoff}" + )); + } + } // Determine prev-LSN for the new timeline. We can only determine it if // the timeline was branched at the current end of the source timeline. @@ -440,13 +464,7 @@ impl Repository for LayeredRepository { Entry::Vacant(_) => bail!("timeline not found"), }; - // try to acquire gc and compaction locks to prevent errors from missing files - let _gc_guard = self - .gc_cs - .try_lock() - .map_err(|e| anyhow::anyhow!("cannot acquire gc lock {e}"))?; - - let compaction_guard = timeline_entry.get().compaction_guard()?; + let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -457,7 +475,7 @@ impl Repository for LayeredRepository { })?; info!("detach removed files"); - drop(compaction_guard); + drop(layer_removal_guard); timeline_entry.remove(); Ok(()) @@ -524,10 +542,10 @@ impl LayeredTimelineEntry { } } - fn compaction_guard(&self) -> Result>, anyhow::Error> { + fn layer_removal_guard(&self) -> Result>, anyhow::Error> { match self { LayeredTimelineEntry::Loaded(timeline) => timeline - .compaction_cs + .layer_removal_cs .try_lock() .map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}")) .map(Some), @@ -883,50 +901,50 @@ impl LayeredRepository { let now = Instant::now(); // grab mutex to prevent new timelines from being created here. - let _gc_cs = self.gc_cs.lock().unwrap(); + let gc_cs = self.gc_cs.lock().unwrap(); + + let mut timelines = self.timelines.lock().unwrap(); // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); - let mut timeline_ids = Vec::new(); - let mut timelines = self.timelines.lock().unwrap(); + let timeline_ids = { + if let Some(target_timeline_id) = target_timeline_id.as_ref() { + if timelines.get(target_timeline_id).is_none() { + bail!("gc target timeline does not exist") + } + }; - if let Some(target_timeline_id) = target_timeline_id.as_ref() { - if timelines.get(target_timeline_id).is_none() { - bail!("gc target timeline does not exist") - } + timelines + .iter() + .map(|(timeline_id, timeline_entry)| { + // This is unresolved question for now, how to do gc in presence of remote timelines + // especially when this is combined with branching. + // Somewhat related: https://github.com/zenithdb/zenith/issues/999 + if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { + // If target_timeline is specified, we only need to know branchpoints of its children + if let Some(timelineid) = target_timeline_id { + if ancestor_timeline_id == &timelineid { + all_branchpoints + .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + } + } + // Collect branchpoints for all timelines + else { + all_branchpoints + .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + } + } + + *timeline_id + }) + .collect::>() }; - for (timeline_id, timeline_entry) in timelines.iter() { - timeline_ids.push(*timeline_id); - - // This is unresolved question for now, how to do gc in presence of remote timelines - // especially when this is combined with branching. - // Somewhat related: https://github.com/neondatabase/neon/issues/999 - if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { - // If target_timeline is specified, we only need to know branchpoints of its children - if let Some(timelineid) = target_timeline_id { - if ancestor_timeline_id == &timelineid { - all_branchpoints - .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); - } - } - // Collect branchpoints for all timelines - else { - all_branchpoints.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); - } - } - } - // Ok, we now know all the branch points. - // Perform GC for each timeline. - for timeline_id in timeline_ids.into_iter() { - if thread_mgr::is_shutdown_requested() { - // We were requested to shut down. Stop and return with the progress we - // made. - break; - } - + // Update the GC information for each timeline. + let mut gc_timelines = Vec::with_capacity(timeline_ids.len()); + for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self .get_timeline_load_internal(timeline_id, &mut *timelines)? @@ -940,7 +958,6 @@ impl LayeredRepository { } if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) { - drop(timelines); let branchpoints: Vec = all_branchpoints .range(( Included((timeline_id, Lsn(0))), @@ -948,21 +965,45 @@ impl LayeredRepository { )) .map(|&x| x.1) .collect(); + timeline.update_gc_info(branchpoints, cutoff, pitr)?; - // If requested, force flush all in-memory layers to disk first, - // so that they too can be garbage collected. That's - // used in tests, so we want as deterministic results as possible. - if checkpoint_before_gc { - timeline.checkpoint(CheckpointConfig::Forced)?; - info!("timeline {} checkpoint_before_gc done", timeline_id); - } - timeline.update_gc_info(branchpoints, cutoff, pitr); - let result = timeline.gc()?; - - totals += result; - timelines = self.timelines.lock().unwrap(); + gc_timelines.push(timeline); } } + drop(timelines); + drop(gc_cs); + + // Perform GC for each timeline. + // + // Note that we don't hold the GC lock here because we don't want + // to delay the branch creation task, which requires the GC lock. + // A timeline GC iteration can be slow because it may need to wait for + // compaction (both require `layer_removal_cs` lock), + // but the GC iteration can run concurrently with branch creation. + // + // See comments in [`LayeredRepository::branch_timeline`] for more information + // about why branch creation task can run concurrently with timeline's GC iteration. + for timeline in gc_timelines { + if thread_mgr::is_shutdown_requested() { + // We were requested to shut down. Stop and return with the progress we + // made. + break; + } + + // If requested, force flush all in-memory layers to disk first, + // so that they too can be garbage collected. That's + // used in tests, so we want as deterministic results as possible. + if checkpoint_before_gc { + timeline.checkpoint(CheckpointConfig::Forced)?; + info!( + "timeline {} checkpoint_before_gc done", + timeline.timeline_id + ); + } + + let result = timeline.gc()?; + totals += result; + } totals.elapsed = now.elapsed(); Ok(totals) @@ -1038,11 +1079,11 @@ pub struct LayeredTimeline { /// Used to ensure that there is only one thread layer_flush_lock: Mutex<()>, - // Prevent concurrent compactions. - // Compactions are normally performed by one thread. But compaction can also be manually - // requested by admin (that's used in tests). These forced compactions run in a different - // thread and could be triggered at the same time as a normal, timed compaction. - compaction_cs: Mutex<()>, + /// Layer removal lock. + /// A lock to ensure that no layer of the timeline is removed concurrently by other threads. + /// This lock is acquired in [`LayeredTimeline::gc`], [`LayeredTimeline::compact`], + /// and [`LayeredRepository::delete_timeline`]. + layer_removal_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected latest_gc_cutoff_lsn: RwLock, @@ -1079,12 +1120,14 @@ struct GcInfo { /// last-record LSN /// /// FIXME: is this inclusive or exclusive? - cutoff: Lsn, + horizon_cutoff: Lsn, - /// In addition to 'retain_lsns', keep everything newer than 'SystemTime::now()' - /// minus 'pitr_interval' + /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this + /// point. /// - pitr: Duration, + /// This is calculated by finding a number such that a record is needed for PITR + /// if only if its LSN is larger than 'pitr_cutoff'. + pitr_cutoff: Lsn, } /// Public interface functions @@ -1324,12 +1367,12 @@ impl LayeredTimeline { write_lock: Mutex::new(()), layer_flush_lock: Mutex::new(()), - compaction_cs: Mutex::new(()), + layer_removal_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { retain_lsns: Vec::new(), - cutoff: Lsn(0), - pitr: Duration::ZERO, + horizon_cutoff: Lsn(0), + pitr_cutoff: Lsn(0), }), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), @@ -1950,7 +1993,7 @@ impl LayeredTimeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let _compaction_cs = self.compaction_cs.lock().unwrap(); + let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); let target_file_size = self.get_checkpoint_distance(); @@ -2267,46 +2310,34 @@ impl LayeredTimeline { /// TODO: that's wishful thinking, compaction doesn't actually do that /// currently. /// - /// The caller specifies how much history is needed with the two arguments: + /// The caller specifies how much history is needed with the 3 arguments: /// /// retain_lsns: keep a version of each page at these LSNs - /// cutoff: also keep everything newer than this LSN + /// cutoff_horizon: also keep everything newer than this LSN + /// pitr: the time duration required to keep data for PITR /// /// The 'retain_lsns' list is currently used to prevent removing files that /// are needed by child timelines. In the future, the user might be able to /// name additional points in time to retain. The caller is responsible for /// collecting that information. /// - /// The 'cutoff' point is used to retain recent versions that might still be + /// The 'cutoff_horizon' point is used to retain recent versions that might still be /// needed by read-only nodes. (As of this writing, the caller just passes /// the latest LSN subtracted by a constant, and doesn't do anything smart /// to figure out what read-only nodes might actually need.) /// - fn update_gc_info(&self, retain_lsns: Vec, cutoff: Lsn, pitr: Duration) { + /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine + /// whether a record is needed for PITR. + fn update_gc_info( + &self, + retain_lsns: Vec, + cutoff_horizon: Lsn, + pitr: Duration, + ) -> Result<()> { let mut gc_info = self.gc_info.write().unwrap(); + + gc_info.horizon_cutoff = cutoff_horizon; gc_info.retain_lsns = retain_lsns; - gc_info.cutoff = cutoff; - gc_info.pitr = pitr; - } - - /// - /// Garbage collect layer files on a timeline that are no longer needed. - /// - /// Currently, we don't make any attempt at removing unneeded page versions - /// within a layer file. We can only remove the whole file if it's fully - /// obsolete. - /// - fn gc(&self) -> Result { - let now = SystemTime::now(); - let mut result: GcResult = Default::default(); - let disk_consistent_lsn = self.get_disk_consistent_lsn(); - - let _compaction_cs = self.compaction_cs.lock().unwrap(); - - let gc_info = self.gc_info.read().unwrap(); - let retain_lsns = &gc_info.retain_lsns; - let cutoff = min(gc_info.cutoff, disk_consistent_lsn); - let pitr = gc_info.pitr; // Calculate pitr cutoff point. // If we cannot determine a cutoff LSN, be conservative and don't GC anything. @@ -2315,6 +2346,7 @@ impl LayeredTimeline { if let Ok(timeline) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) { + let now = SystemTime::now(); // First, calculate pitr_cutoff_timestamp and then convert it to LSN. // If we don't have enough data to convert to LSN, // play safe and don't remove any layers. @@ -2325,7 +2357,7 @@ impl LayeredTimeline { LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, LsnForTimestamp::Future(lsn) => { debug!("future({})", lsn); - pitr_cutoff_lsn = cutoff; + pitr_cutoff_lsn = gc_info.horizon_cutoff; } LsnForTimestamp::Past(lsn) => { debug!("past({})", lsn); @@ -2339,22 +2371,47 @@ impl LayeredTimeline { } else if cfg!(test) { // We don't have local timeline in mocked cargo tests. // So, just ignore pitr_interval setting in this case. - pitr_cutoff_lsn = cutoff; + pitr_cutoff_lsn = gc_info.horizon_cutoff; } + gc_info.pitr_cutoff = pitr_cutoff_lsn; - let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn); + Ok(()) + } + + /// + /// Garbage collect layer files on a timeline that are no longer needed. + /// + /// Currently, we don't make any attempt at removing unneeded page versions + /// within a layer file. We can only remove the whole file if it's fully + /// obsolete. + /// + fn gc(&self) -> Result { + let mut result: GcResult = Default::default(); + let now = SystemTime::now(); + + fail_point!("before-timeline-gc"); + + let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); + + let gc_info = self.gc_info.read().unwrap(); + + let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn()); + let pitr_cutoff = gc_info.pitr_cutoff; + let retain_lsns = &gc_info.retain_lsns; + + let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); // Nothing to GC. Return early. - if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff { + let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); + if latest_gc_cutoff >= new_gc_cutoff { info!( - "Nothing to GC for timeline {}. cutoff_lsn {}", - self.timeline_id, new_gc_cutoff + "Nothing to GC for timeline {}: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}", + self.timeline_id ); - result.elapsed = now.elapsed()?; return Ok(result); } - let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered(); + let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %new_gc_cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. @@ -2388,23 +2445,23 @@ impl LayeredTimeline { result.layers_total += 1; - // 1. Is it newer than cutoff point? - if l.get_lsn_range().end > cutoff { + // 1. Is it newer than GC horizon cutoff point? + if l.get_lsn_range().end > horizon_cutoff { debug!( - "keeping {} because it's newer than cutoff {}", + "keeping {} because it's newer than horizon_cutoff {}", l.filename().display(), - cutoff + horizon_cutoff ); result.layers_needed_by_cutoff += 1; continue 'outer; } // 2. It is newer than PiTR cutoff point? - if l.get_lsn_range().end > pitr_cutoff_lsn { + if l.get_lsn_range().end > pitr_cutoff { debug!( - "keeping {} because it's newer than pitr_cutoff_lsn {}", + "keeping {} because it's newer than pitr_cutoff {}", l.filename().display(), - pitr_cutoff_lsn + pitr_cutoff ); result.layers_needed_by_pitr += 1; continue 'outer; @@ -2823,7 +2880,7 @@ pub mod tests { let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2893,7 +2950,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; @@ -2970,7 +3027,7 @@ pub mod tests { // Perform a cycle of checkpoint, compaction, and GC println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); - tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO); + tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced)?; tline.compact()?; tline.gc()?; diff --git a/test_runner/batch_others/test_branch_and_gc.py b/test_runner/batch_others/test_branch_and_gc.py index a6210b9176..7157386ce2 100644 --- a/test_runner/batch_others/test_branch_and_gc.py +++ b/test_runner/batch_others/test_branch_and_gc.py @@ -1,3 +1,5 @@ +import threading +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv from fixtures.utils import lsn_from_hex @@ -99,3 +101,67 @@ def test_branch_and_gc(neon_simple_env: NeonEnv): branch_cur.execute('SELECT count(*) FROM foo') assert branch_cur.fetchone() == (200000, ) + + +# This test simulates a race condition happening when branch creation and GC are performed concurrently. +# +# Suppose we want to create a new timeline 't' from a source timeline 's' starting +# from a lsn 'lsn'. Upon creating 't', if we don't hold the GC lock and compare 'lsn' with +# the latest GC information carefully, it's possible for GC to accidentally remove data +# needed by the new timeline. +# +# In this test, GC is requested before the branch creation but is delayed to happen after branch creation. +# As a result, when doing GC for the source timeline, we don't have any information about +# the upcoming new branches, so it's possible to remove data that may be needed by the new branches. +# It's the branch creation task's job to make sure the starting 'lsn' is not out of scope +# and prevent creating branches with invalid starting LSNs. +# +# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447. +def test_branch_creation_before_gc(neon_simple_env: NeonEnv): + env = neon_simple_env + # Disable background GC but set the `pitr_interval` to be small, so GC can delete something + tenant, _ = env.neon_cli.create_tenant( + conf={ + # disable background GC + 'gc_period': '10 m', + 'gc_horizon': f'{10 * 1024 ** 3}', + + # small checkpoint distance to create more delta layer files + 'checkpoint_distance': f'{1024 ** 2}', + + # set the target size to be large to allow the image layer to cover the whole key space + 'compaction_target_size': f'{1024 ** 3}', + + # tweak the default settings to allow quickly create image layers and L1 layers + 'compaction_period': '1 s', + 'compaction_threshold': '2', + 'image_creation_threshold': '1', + + # set PITR interval to be small, so we can do GC + 'pitr_interval': '1 s' + }) + + b0 = env.neon_cli.create_branch('b0', tenant_id=tenant) + pg0 = env.postgres.create_start('b0', tenant_id=tenant) + res = pg0.safe_psql_many(queries=[ + "CREATE TABLE t(key serial primary key)", + "INSERT INTO t SELECT FROM generate_series(1, 100000)", + "SELECT pg_current_wal_insert_lsn()", + "INSERT INTO t SELECT FROM generate_series(1, 100000)", + ]) + lsn = res[2][0][0] + + # Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the + # branch creation task but the individual timeline GC iteration happens *after* + # the branch creation task. + env.pageserver.safe_psql(f"failpoints before-timeline-gc=sleep(2000)") + + def do_gc(): + env.pageserver.safe_psql(f"do_gc {tenant.hex} {b0.hex} 0") + + thread = threading.Thread(target=do_gc, daemon=True) + thread.start() + + # The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC. + with pytest.raises(Exception, match="invalid branch start lsn"): + env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn) diff --git a/test_runner/performance/test_branch_creation.py b/test_runner/performance/test_branch_creation.py new file mode 100644 index 0000000000..1d39b0830d --- /dev/null +++ b/test_runner/performance/test_branch_creation.py @@ -0,0 +1,110 @@ +import random +import time +import statistics +import threading +import timeit +import pytest +from typing import List +from fixtures.benchmark_fixture import MetricReport +from fixtures.compare_fixtures import NeonCompare +from fixtures.log_helper import log + + +def _record_branch_creation_durations(neon_compare: NeonCompare, durs: List[float]): + neon_compare.zenbenchmark.record("branch_creation_duration_max", + max(durs), + 's', + MetricReport.LOWER_IS_BETTER) + neon_compare.zenbenchmark.record("branch_creation_duration_avg", + statistics.mean(durs), + 's', + MetricReport.LOWER_IS_BETTER) + neon_compare.zenbenchmark.record("branch_creation_duration_stdev", + statistics.stdev(durs), + 's', + MetricReport.LOWER_IS_BETTER) + + +@pytest.mark.parametrize("n_branches", [20]) +# Test measures the latency of branch creation during a heavy [1] workload. +# +# [1]: to simulate a heavy workload, the test tweaks the GC and compaction settings +# to increase the task's frequency. The test runs `pgbench` in each new branch. +# Each branch is created from a randomly picked source branch. +def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int): + env = neon_compare.env + pg_bin = neon_compare.pg_bin + + # Use aggressive GC and checkpoint settings, so GC and compaction happen more often during the test + tenant, _ = env.neon_cli.create_tenant( + conf={ + 'gc_period': '5 s', + 'gc_horizon': f'{4 * 1024 ** 2}', + 'checkpoint_distance': f'{2 * 1024 ** 2}', + 'compaction_target_size': f'{1024 ** 2}', + 'compaction_threshold': '2', + # set PITR interval to be small, so we can do GC + 'pitr_interval': '5 s' + }) + + def run_pgbench(branch: str): + log.info(f"Start a pgbench workload on branch {branch}") + + pg = env.postgres.create_start(branch, tenant_id=tenant) + connstr = pg.connstr() + + pg_bin.run_capture(['pgbench', '-i', connstr]) + pg_bin.run_capture(['pgbench', '-c10', '-T10', connstr]) + + pg.stop() + + env.neon_cli.create_branch('b0', tenant_id=tenant) + + threads: List[threading.Thread] = [] + threads.append(threading.Thread(target=run_pgbench, args=('b0', ), daemon=True)) + threads[-1].start() + + branch_creation_durations = [] + for i in range(n_branches): + time.sleep(1.0) + + # random a source branch + p = random.randint(0, i) + + timer = timeit.default_timer() + env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p), tenant_id=tenant) + dur = timeit.default_timer() - timer + + log.info(f"Creating branch b{i+1} took {dur}s") + branch_creation_durations.append(dur) + + threads.append(threading.Thread(target=run_pgbench, args=(f'b{i+1}', ), daemon=True)) + threads[-1].start() + + for thread in threads: + thread.join() + + _record_branch_creation_durations(neon_compare, branch_creation_durations) + + +@pytest.mark.parametrize("n_branches", [1024]) +# Test measures the latency of branch creation when creating a lot of branches. +def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int): + env = neon_compare.env + + env.neon_cli.create_branch('b0') + + pg = env.postgres.create_start('b0') + neon_compare.pg_bin.run_capture(['pgbench', '-i', '-s10', pg.connstr()]) + + branch_creation_durations = [] + + for i in range(n_branches): + # random a source branch + p = random.randint(0, i) + timer = timeit.default_timer() + env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(p)) + dur = timeit.default_timer() - timer + branch_creation_durations.append(dur) + + _record_branch_creation_durations(neon_compare, branch_creation_durations)