diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 56d97bf8a9..74f3fce6e5 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -10,6 +10,8 @@ default = [] # which adds some runtime cost to run tests on outage conditions testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"] +fuzz-read-path = ["testing"] + [dependencies] anyhow.workspace = true arc-swap.workspace = true diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f14c7608fd..0ba70f45b2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5933,12 +5933,20 @@ mod tests { use models::CompactLsnRange; use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX}; use pageserver_api::keyspace::KeySpace; + #[cfg(feature = "testing")] + use pageserver_api::keyspace::KeySpaceRandomAccum; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; #[cfg(feature = "testing")] use pageserver_api::record::NeonWalRecord; use pageserver_api::value::Value; use pageserver_compaction::helpers::overlaps_with; + #[cfg(feature = "testing")] + use rand::SeedableRng; + #[cfg(feature = "testing")] + use rand::rngs::StdRng; use rand::{Rng, thread_rng}; + #[cfg(feature = "testing")] + use std::ops::Range; use storage_layer::{IoConcurrency, PersistentLayerKey}; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; @@ -5960,6 +5968,318 @@ mod tests { static TEST_KEY: Lazy = Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001"))); + #[cfg(feature = "testing")] + struct TestTimelineSpecification { + start_lsn: Lsn, + last_record_lsn: Lsn, + + in_memory_layers_shape: Vec<(Range, Range)>, + delta_layers_shape: Vec<(Range, Range)>, + image_layers_shape: Vec<(Range, Lsn)>, + + gap_chance: u8, + will_init_chance: u8, + } + + #[cfg(feature = "testing")] + struct Storage { + storage: HashMap<(Key, Lsn), Value>, + start_lsn: Lsn, + } + + #[cfg(feature = "testing")] + impl Storage { + fn get(&self, key: Key, lsn: Lsn) -> Bytes { + use bytes::BufMut; + + let mut crnt_lsn = lsn; + let mut got_base = false; + + let mut acc = Vec::new(); + + while crnt_lsn >= self.start_lsn { + if let Some(value) = self.storage.get(&(key, crnt_lsn)) { + acc.push(value.clone()); + + match value { + Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => { + if *will_init { + got_base = true; + break; + } + } + Value::Image(_) => { + got_base = true; + break; + } + _ => unreachable!(), + } + } + + crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap(); + } + + assert!( + got_base, + "Input data was incorrect. No base image for {key}@{lsn}" + ); + + tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len()); + + let mut blob = BytesMut::new(); + for value in acc.into_iter().rev() { + match value { + Value::WalRecord(NeonWalRecord::Test { append, .. }) => { + blob.extend_from_slice(append.as_bytes()); + } + Value::Image(img) => { + blob.put(img); + } + _ => unreachable!(), + } + } + + blob.into() + } + } + + #[cfg(feature = "testing")] + #[allow(clippy::too_many_arguments)] + async fn randomize_timeline( + tenant: &Arc, + new_timeline_id: TimelineId, + pg_version: u32, + spec: TestTimelineSpecification, + random: &mut rand::rngs::StdRng, + ctx: &RequestContext, + ) -> anyhow::Result<(Arc, Storage, Vec)> { + let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default(); + let mut interesting_lsns = vec![spec.last_record_lsn]; + + for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() { + let mut lsn = lsn_range.start; + while lsn < lsn_range.end { + let mut key = key_range.start; + while key < key_range.end { + let gap = random.gen_range(1..=100) <= spec.gap_chance; + let will_init = random.gen_range(1..=100) <= spec.will_init_chance; + + if gap { + continue; + } + + let record = if will_init { + Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]"))) + } else { + Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]"))) + }; + + storage.insert((key, lsn), record); + + key = key.next(); + } + lsn = Lsn(lsn.0 + 1); + } + + // Stash some interesting LSN for future use + for offset in [0, 5, 100].iter() { + if *offset == 0 { + interesting_lsns.push(lsn_range.start); + } else { + let below = lsn_range.start.checked_sub(*offset); + match below { + Some(v) if v >= spec.start_lsn => { + interesting_lsns.push(v); + } + _ => {} + } + + let above = Lsn(lsn_range.start.0 + offset); + interesting_lsns.push(above); + } + } + } + + for (key_range, lsn_range) in spec.delta_layers_shape.iter() { + let mut lsn = lsn_range.start; + while lsn < lsn_range.end { + let mut key = key_range.start; + while key < key_range.end { + let gap = random.gen_range(1..=100) <= spec.gap_chance; + let will_init = random.gen_range(1..=100) <= spec.will_init_chance; + + if gap { + continue; + } + + let record = if will_init { + Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]"))) + } else { + Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]"))) + }; + + storage.insert((key, lsn), record); + + key = key.next(); + } + lsn = Lsn(lsn.0 + 1); + } + + // Stash some interesting LSN for future use + for offset in [0, 5, 100].iter() { + if *offset == 0 { + interesting_lsns.push(lsn_range.start); + } else { + let below = lsn_range.start.checked_sub(*offset); + match below { + Some(v) if v >= spec.start_lsn => { + interesting_lsns.push(v); + } + _ => {} + } + + let above = Lsn(lsn_range.start.0 + offset); + interesting_lsns.push(above); + } + } + } + + for (key_range, lsn) in spec.image_layers_shape.iter() { + let mut key = key_range.start; + while key < key_range.end { + let blob = Bytes::from(format!("[image {key}@{lsn}]")); + let record = Value::Image(blob.clone()); + storage.insert((key, *lsn), record); + + key = key.next(); + } + + // Stash some interesting LSN for future use + for offset in [0, 5, 100].iter() { + if *offset == 0 { + interesting_lsns.push(*lsn); + } else { + let below = lsn.checked_sub(*offset); + match below { + Some(v) if v >= spec.start_lsn => { + interesting_lsns.push(v); + } + _ => {} + } + + let above = Lsn(lsn.0 + offset); + interesting_lsns.push(above); + } + } + } + + let in_memory_test_layers = { + let mut acc = Vec::new(); + + for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() { + let mut data = Vec::new(); + + let mut lsn = lsn_range.start; + while lsn < lsn_range.end { + let mut key = key_range.start; + while key < key_range.end { + if let Some(record) = storage.get(&(key, lsn)) { + data.push((key, lsn, record.clone())); + } + + key = key.next(); + } + lsn = Lsn(lsn.0 + 1); + } + + acc.push(InMemoryLayerTestDesc { + data, + lsn_range: lsn_range.clone(), + is_open: false, + }) + } + + acc + }; + + let delta_test_layers = { + let mut acc = Vec::new(); + + for (key_range, lsn_range) in spec.delta_layers_shape.iter() { + let mut data = Vec::new(); + + let mut lsn = lsn_range.start; + while lsn < lsn_range.end { + let mut key = key_range.start; + while key < key_range.end { + if let Some(record) = storage.get(&(key, lsn)) { + data.push((key, lsn, record.clone())); + } + + key = key.next(); + } + lsn = Lsn(lsn.0 + 1); + } + + acc.push(DeltaLayerTestDesc { + data, + lsn_range: lsn_range.clone(), + key_range: key_range.clone(), + }) + } + + acc + }; + + let image_test_layers = { + let mut acc = Vec::new(); + + for (key_range, lsn) in spec.image_layers_shape.iter() { + let mut data = Vec::new(); + + let mut key = key_range.start; + while key < key_range.end { + if let Some(record) = storage.get(&(key, *lsn)) { + let blob = match record { + Value::Image(blob) => blob.clone(), + _ => unreachable!(), + }; + + data.push((key, blob)); + } + + key = key.next(); + } + + acc.push((*lsn, data)); + } + + acc + }; + + let tline = tenant + .create_test_timeline_with_layers( + new_timeline_id, + spec.start_lsn, + pg_version, + ctx, + in_memory_test_layers, + delta_test_layers, + image_test_layers, + spec.last_record_lsn, + ) + .await?; + + Ok(( + tline, + Storage { + storage, + start_lsn: spec.start_lsn, + }, + interesting_lsns, + )) + } + #[tokio::test] async fn test_basic() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await; @@ -10543,6 +10863,214 @@ mod tests { Ok(()) } + // A randomized read path test. Generates a layer map according to a deterministic + // specification. Fills the (key, LSN) space in random manner and then performs + // random scattered queries validating the results against in-memory storage. + // + // See this internal Notion page for a diagram of the layer map: + // https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4 + // + // A fuzzing mode is also supported. In this mode, the test will use a random + // seed instead of a hardcoded one. Use it in conjunction with `cargo stress` + // to run multiple instances in parallel: + // + // $ RUST_BACKTRACE=1 RUST_LOG=INFO \ + // cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_read_path() -> anyhow::Result<()> { + use rand::seq::SliceRandom; + + let seed = if cfg!(feature = "fuzz-read-path") { + let seed: u64 = thread_rng().r#gen(); + seed + } else { + // Use a hard-coded seed when not in fuzzing mode. + // Note that with the current approach results are not reproducible + // accross platforms and Rust releases. + const SEED: u64 = 0; + SEED + }; + + let mut random = StdRng::seed_from_u64(seed); + + let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") { + const QUERIES: u64 = 5000; + let will_init_chance: u8 = random.gen_range(0..=10); + let gap_chance: u8 = random.gen_range(0..=50); + + (QUERIES, will_init_chance, gap_chance) + } else { + const QUERIES: u64 = 1000; + const WILL_INIT_CHANCE: u8 = 1; + const GAP_CHANCE: u8 = 5; + + (QUERIES, WILL_INIT_CHANCE, GAP_CHANCE) + }; + + let harness = TenantHarness::create("test_read_path").await?; + let (tenant, ctx) = harness.load().await; + + tracing::info!("Using random seed: {seed}"); + tracing::info!(%will_init_chance, %gap_chance, "Fill params"); + + // Define the layer map shape. Note that this part is not randomized. + + const KEY_DIMENSION_SIZE: u32 = 99; + let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap(); + let end_key = start_key.add(KEY_DIMENSION_SIZE); + let total_key_range = start_key..end_key; + let total_key_range_size = end_key.to_i128() - start_key.to_i128(); + let total_start_lsn = Lsn(104); + let last_record_lsn = Lsn(504); + + assert!(total_key_range_size % 3 == 0); + + let in_memory_layers_shape = vec![ + (total_key_range.clone(), Lsn(304)..Lsn(400)), + (total_key_range.clone(), Lsn(400)..last_record_lsn), + ]; + + let delta_layers_shape = vec![ + ( + start_key..(start_key.add((total_key_range_size / 3) as u32)), + Lsn(200)..Lsn(304), + ), + ( + (start_key.add((total_key_range_size / 3) as u32)) + ..(start_key.add((total_key_range_size * 2 / 3) as u32)), + Lsn(200)..Lsn(304), + ), + ( + (start_key.add((total_key_range_size * 2 / 3) as u32)) + ..(start_key.add(total_key_range_size as u32)), + Lsn(200)..Lsn(304), + ), + ]; + + let image_layers_shape = vec![ + ( + start_key.add((total_key_range_size * 2 / 3 - 10) as u32) + ..start_key.add((total_key_range_size * 2 / 3 + 10) as u32), + Lsn(456), + ), + ( + start_key.add((total_key_range_size / 3 - 10) as u32) + ..start_key.add((total_key_range_size / 3 + 10) as u32), + Lsn(256), + ), + (total_key_range.clone(), total_start_lsn), + ]; + + let specification = TestTimelineSpecification { + start_lsn: total_start_lsn, + last_record_lsn, + in_memory_layers_shape, + delta_layers_shape, + image_layers_shape, + gap_chance, + will_init_chance, + }; + + // Create and randomly fill in the layers according to the specification + let (tline, storage, interesting_lsns) = randomize_timeline( + &tenant, + TIMELINE_ID, + DEFAULT_PG_VERSION, + specification, + &mut random, + &ctx, + ) + .await?; + + // Now generate queries based on the interesting lsns that we've collected. + // + // While there's still room in the query, pick and interesting LSN and a random + // key. Then roll the dice to see if the next key should also be included in + // the query. When the roll fails, break the "batch" and pick another point in the + // (key, LSN) space. + + const PICK_NEXT_CHANCE: u8 = 50; + for _ in 0..queries { + let query = { + let mut keyspaces_at_lsn: HashMap = HashMap::default(); + let mut used_keys: HashSet = HashSet::default(); + + while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty"); + let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE)); + + while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + if used_keys.contains(&selected_key) + || selected_key >= start_key.add(KEY_DIMENSION_SIZE) + { + break; + } + + keyspaces_at_lsn + .entry(*selected_lsn) + .or_default() + .add_key(selected_key); + used_keys.insert(selected_key); + + let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE; + if pick_next { + selected_key = selected_key.next(); + } else { + break; + } + } + } + + VersionedKeySpaceQuery::scattered( + keyspaces_at_lsn + .into_iter() + .map(|(lsn, acc)| (lsn, acc.to_keyspace())) + .collect(), + ) + }; + + // Run the query and validate the results + + let results = tline + .get_vectored(query.clone(), IoConcurrency::Sequential, &ctx) + .await; + + let blobs = match results { + Ok(ok) => ok, + Err(err) => { + panic!("seed={seed} Error returned for query {query}: {err}"); + } + }; + + for (key, key_res) in blobs.into_iter() { + match key_res { + Ok(blob) => { + let requested_at_lsn = query.map_key_to_lsn(&key); + let expected = storage.get(key, requested_at_lsn); + + if blob != expected { + tracing::error!( + "seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}" + ); + } + + assert_eq!(blob, expected); + } + Err(err) => { + let requested_at_lsn = query.map_key_to_lsn(&key); + + panic!( + "seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}" + ); + } + } + } + } + + Ok(()) + } + fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering { ( k1.is_delta, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 204bdb5eee..c27a4b62da 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4026,7 +4026,7 @@ impl VersionedKeySpaceQuery { /// Returns LSN for a specific key. /// /// Invariant: requested key must be part of [`Self::total_keyspace`] - fn map_key_to_lsn(&self, key: &Key) -> Lsn { + pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn { match self { Self::Uniform { lsn, .. } => *lsn, Self::Scattered { keyspaces_at_lsn } => {