mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
pageserver: add a randomized read path test (#11519)
## Problem
Every time we make changes to the read path to fix a bug or add a
feature,
we end up adding another incomprehensible test.
## Summary of changes
Add some generic infrastructure for generating a layer map from a type
spec
and use that for a read path test. The test is randomized but uses a
fixed seed
by default. A fuzzing mode is available for confidence building.
See [Notion
page](https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4)
for a diagram of the layer map
used.
Just for fun I tried removing [this
commit](9990199cb4)
from https://github.com/neondatabase/neon/pull/11494
and it caught the bug in the normal mode (no fuzzing required).
This commit is contained in:
@@ -10,6 +10,8 @@ default = []
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
|
||||
fuzz-read-path = ["testing"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
|
||||
@@ -5933,12 +5933,20 @@ mod tests {
|
||||
use models::CompactLsnRange;
|
||||
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, thread_rng};
|
||||
#[cfg(feature = "testing")]
|
||||
use std::ops::Range;
|
||||
use storage_layer::{IoConcurrency, PersistentLayerKey};
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
@@ -5960,6 +5968,318 @@ mod tests {
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct TestTimelineSpecification {
|
||||
start_lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
|
||||
in_memory_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
delta_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
image_layers_shape: Vec<(Range<Key>, Lsn)>,
|
||||
|
||||
gap_chance: u8,
|
||||
will_init_chance: u8,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct Storage {
|
||||
storage: HashMap<(Key, Lsn), Value>,
|
||||
start_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
impl Storage {
|
||||
fn get(&self, key: Key, lsn: Lsn) -> Bytes {
|
||||
use bytes::BufMut;
|
||||
|
||||
let mut crnt_lsn = lsn;
|
||||
let mut got_base = false;
|
||||
|
||||
let mut acc = Vec::new();
|
||||
|
||||
while crnt_lsn >= self.start_lsn {
|
||||
if let Some(value) = self.storage.get(&(key, crnt_lsn)) {
|
||||
acc.push(value.clone());
|
||||
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => {
|
||||
if *will_init {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Value::Image(_) => {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap();
|
||||
}
|
||||
|
||||
assert!(
|
||||
got_base,
|
||||
"Input data was incorrect. No base image for {key}@{lsn}"
|
||||
);
|
||||
|
||||
tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len());
|
||||
|
||||
let mut blob = BytesMut::new();
|
||||
for value in acc.into_iter().rev() {
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { append, .. }) => {
|
||||
blob.extend_from_slice(append.as_bytes());
|
||||
}
|
||||
Value::Image(img) => {
|
||||
blob.put(img);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
blob.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn randomize_timeline(
|
||||
tenant: &Arc<Tenant>,
|
||||
new_timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
spec: TestTimelineSpecification,
|
||||
random: &mut rand::rngs::StdRng,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(Arc<Timeline>, Storage, Vec<Lsn>)> {
|
||||
let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default();
|
||||
let mut interesting_lsns = vec![spec.last_record_lsn];
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let blob = Bytes::from(format!("[image {key}@{lsn}]"));
|
||||
let record = Value::Image(blob.clone());
|
||||
storage.insert((key, *lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(*lsn);
|
||||
} else {
|
||||
let below = lsn.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let in_memory_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(InMemoryLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
is_open: false,
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let delta_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(DeltaLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
key_range: key_range.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let image_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, *lsn)) {
|
||||
let blob = match record {
|
||||
Value::Image(blob) => blob.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
data.push((key, blob));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
acc.push((*lsn, data));
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
new_timeline_id,
|
||||
spec.start_lsn,
|
||||
pg_version,
|
||||
ctx,
|
||||
in_memory_test_layers,
|
||||
delta_test_layers,
|
||||
image_test_layers,
|
||||
spec.last_record_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
tline,
|
||||
Storage {
|
||||
storage,
|
||||
start_lsn: spec.start_lsn,
|
||||
},
|
||||
interesting_lsns,
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
|
||||
@@ -10543,6 +10863,214 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// A randomized read path test. Generates a layer map according to a deterministic
|
||||
// specification. Fills the (key, LSN) space in random manner and then performs
|
||||
// random scattered queries validating the results against in-memory storage.
|
||||
//
|
||||
// See this internal Notion page for a diagram of the layer map:
|
||||
// https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4
|
||||
//
|
||||
// A fuzzing mode is also supported. In this mode, the test will use a random
|
||||
// seed instead of a hardcoded one. Use it in conjunction with `cargo stress`
|
||||
// to run multiple instances in parallel:
|
||||
//
|
||||
// $ RUST_BACKTRACE=1 RUST_LOG=INFO \
|
||||
// cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_read_path() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let seed = if cfg!(feature = "fuzz-read-path") {
|
||||
let seed: u64 = thread_rng().r#gen();
|
||||
seed
|
||||
} else {
|
||||
// Use a hard-coded seed when not in fuzzing mode.
|
||||
// Note that with the current approach results are not reproducible
|
||||
// accross platforms and Rust releases.
|
||||
const SEED: u64 = 0;
|
||||
SEED
|
||||
};
|
||||
|
||||
let mut random = StdRng::seed_from_u64(seed);
|
||||
|
||||
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
|
||||
const QUERIES: u64 = 5000;
|
||||
let will_init_chance: u8 = random.gen_range(0..=10);
|
||||
let gap_chance: u8 = random.gen_range(0..=50);
|
||||
|
||||
(QUERIES, will_init_chance, gap_chance)
|
||||
} else {
|
||||
const QUERIES: u64 = 1000;
|
||||
const WILL_INIT_CHANCE: u8 = 1;
|
||||
const GAP_CHANCE: u8 = 5;
|
||||
|
||||
(QUERIES, WILL_INIT_CHANCE, GAP_CHANCE)
|
||||
};
|
||||
|
||||
let harness = TenantHarness::create("test_read_path").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
tracing::info!("Using random seed: {seed}");
|
||||
tracing::info!(%will_init_chance, %gap_chance, "Fill params");
|
||||
|
||||
// Define the layer map shape. Note that this part is not randomized.
|
||||
|
||||
const KEY_DIMENSION_SIZE: u32 = 99;
|
||||
let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
let end_key = start_key.add(KEY_DIMENSION_SIZE);
|
||||
let total_key_range = start_key..end_key;
|
||||
let total_key_range_size = end_key.to_i128() - start_key.to_i128();
|
||||
let total_start_lsn = Lsn(104);
|
||||
let last_record_lsn = Lsn(504);
|
||||
|
||||
assert!(total_key_range_size % 3 == 0);
|
||||
|
||||
let in_memory_layers_shape = vec![
|
||||
(total_key_range.clone(), Lsn(304)..Lsn(400)),
|
||||
(total_key_range.clone(), Lsn(400)..last_record_lsn),
|
||||
];
|
||||
|
||||
let delta_layers_shape = vec![
|
||||
(
|
||||
start_key..(start_key.add((total_key_range_size / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size / 3) as u32))
|
||||
..(start_key.add((total_key_range_size * 2 / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size * 2 / 3) as u32))
|
||||
..(start_key.add(total_key_range_size as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
];
|
||||
|
||||
let image_layers_shape = vec![
|
||||
(
|
||||
start_key.add((total_key_range_size * 2 / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size * 2 / 3 + 10) as u32),
|
||||
Lsn(456),
|
||||
),
|
||||
(
|
||||
start_key.add((total_key_range_size / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size / 3 + 10) as u32),
|
||||
Lsn(256),
|
||||
),
|
||||
(total_key_range.clone(), total_start_lsn),
|
||||
];
|
||||
|
||||
let specification = TestTimelineSpecification {
|
||||
start_lsn: total_start_lsn,
|
||||
last_record_lsn,
|
||||
in_memory_layers_shape,
|
||||
delta_layers_shape,
|
||||
image_layers_shape,
|
||||
gap_chance,
|
||||
will_init_chance,
|
||||
};
|
||||
|
||||
// Create and randomly fill in the layers according to the specification
|
||||
let (tline, storage, interesting_lsns) = randomize_timeline(
|
||||
&tenant,
|
||||
TIMELINE_ID,
|
||||
DEFAULT_PG_VERSION,
|
||||
specification,
|
||||
&mut random,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Now generate queries based on the interesting lsns that we've collected.
|
||||
//
|
||||
// While there's still room in the query, pick and interesting LSN and a random
|
||||
// key. Then roll the dice to see if the next key should also be included in
|
||||
// the query. When the roll fails, break the "batch" and pick another point in the
|
||||
// (key, LSN) space.
|
||||
|
||||
const PICK_NEXT_CHANCE: u8 = 50;
|
||||
for _ in 0..queries {
|
||||
let query = {
|
||||
let mut keyspaces_at_lsn: HashMap<Lsn, KeySpaceRandomAccum> = HashMap::default();
|
||||
let mut used_keys: HashSet<Key> = HashSet::default();
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
|
||||
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
if used_keys.contains(&selected_key)
|
||||
|| selected_key >= start_key.add(KEY_DIMENSION_SIZE)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
keyspaces_at_lsn
|
||||
.entry(*selected_lsn)
|
||||
.or_default()
|
||||
.add_key(selected_key);
|
||||
used_keys.insert(selected_key);
|
||||
|
||||
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
|
||||
if pick_next {
|
||||
selected_key = selected_key.next();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VersionedKeySpaceQuery::scattered(
|
||||
keyspaces_at_lsn
|
||||
.into_iter()
|
||||
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
|
||||
// Run the query and validate the results
|
||||
|
||||
let results = tline
|
||||
.get_vectored(query.clone(), IoConcurrency::Sequential, &ctx)
|
||||
.await;
|
||||
|
||||
let blobs = match results {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
panic!("seed={seed} Error returned for query {query}: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
for (key, key_res) in blobs.into_iter() {
|
||||
match key_res {
|
||||
Ok(blob) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
let expected = storage.get(key, requested_at_lsn);
|
||||
|
||||
if blob != expected {
|
||||
tracing::error!(
|
||||
"seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(blob, expected);
|
||||
}
|
||||
Err(err) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
|
||||
panic!(
|
||||
"seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
|
||||
(
|
||||
k1.is_delta,
|
||||
|
||||
@@ -4026,7 +4026,7 @@ impl VersionedKeySpaceQuery {
|
||||
/// Returns LSN for a specific key.
|
||||
///
|
||||
/// Invariant: requested key must be part of [`Self::total_keyspace`]
|
||||
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
match self {
|
||||
Self::Uniform { lsn, .. } => *lsn,
|
||||
Self::Scattered { keyspaces_at_lsn } => {
|
||||
|
||||
Reference in New Issue
Block a user