Compare commits

...

3 Commits

Author SHA1 Message Date
Vlad Lazar
e02334c15e fixup: doc reference to renamed field 2024-09-16 19:44:09 +01:00
Christian Schwarz
f0430e97a2 prove hypothesis (inefficient fix) 2024-09-16 17:17:49 +01:00
Vlad Lazar
25e31b247b tests: add unit test for vec read with overlapped images 2024-09-16 17:17:26 +01:00
2 changed files with 165 additions and 21 deletions

View File

@@ -8470,4 +8470,127 @@ mod tests {
Ok(())
}
// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let mut expected_key_values = HashMap::new();
let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
baseline_img_layer.push((key, Bytes::from(value)));
}
let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
nested_img_layer.push((key, Bytes::from(value)));
}
let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;
for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}
Ok(())
}
}

View File

@@ -279,7 +279,7 @@ pub(crate) enum LayerId {
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ReadableLayer {
PersistentLayer(Layer),
InMemoryLayer(Arc<InMemoryLayer>),
@@ -292,6 +292,8 @@ struct ReadDesc {
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
/// This read's index in [`LayerKeyspace::reads`];
read_id: LayerKeyspaceReadId,
}
/// Data structure which maintains a fringe of layers for the
@@ -310,9 +312,13 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
next_read_id: LayerKeyspaceReadId,
reads: HashMap<LayerKeyspaceReadId, (Range<Lsn>, KeySpace)>,
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct LayerKeyspaceReadId(usize);
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
@@ -327,22 +333,24 @@ impl LayerFringe {
None => return None,
};
let removed = self.layers.remove_entry(&read_desc.layer_id);
let mut entry = match self.layers.entry(read_desc.layer_id) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => unreachable!("fringe internals are always consistent"),
};
match removed {
Some((
_,
LayerKeyspace {
layer,
mut target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
let (lsn_range, keyspace) = entry
.get_mut()
.reads
.remove(&read_desc.read_id)
.expect("fringe internals are always consistent");
let layer = entry.get().layer.clone();
if entry.get().reads.is_empty() {
entry.remove();
}
Some((layer, keyspace, lsn_range))
}
pub(crate) fn update(
@@ -355,18 +363,31 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
let read_id = {
let r = &mut entry.get_mut().next_read_id;
let read_id = *r;
*r = LayerKeyspaceReadId(r.0 + 1);
read_id
};
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let replaced = entry.get_mut().reads.insert(read_id, (lsn_range, keyspace));
assert!(replaced.is_none());
}
Entry::Vacant(entry) => {
let read_id = LayerKeyspaceReadId(0);
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
next_read_id: LayerKeyspaceReadId(1),
reads: [(read_id, (lsn_range, keyspace))].into(),
});
}
}