mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
pageserver: simple fix for vectored read image layer skip (#9026)
## Problem Different keyspaces may require different floor LSNs in vectored delta layer visits. This patch adds support for such cases. ## Summary of changes Different keyspaces wishing to read the same layer might require different stop lsns (or lsn floor). The start LSN of the read (or the lsn ceil) will always be the same. With this observation, we fix skipping of image layers by indexing the fringe by layer id plus lsn floor. This is very simple, but means that we can visit delta layers twice in certain cases. Still, I think it's very unlikely for any extra merging to have taken place in this case, so perhaps it makes sense to go with the simpler patch. Fixes https://github.com/neondatabase/neon/issues/9012 Alternative to https://github.com/neondatabase/neon/pull/9025
This commit is contained in:
@@ -4164,9 +4164,18 @@ pub(crate) mod harness {
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
let base_img = base_img.expect("Neon WAL redo requires base image").1;
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&base_img);
|
||||
let mut page = match (base_img, records.first()) {
|
||||
(Some((_lsn, img)), _) => {
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&img);
|
||||
page
|
||||
}
|
||||
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
|
||||
_ => {
|
||||
panic!("Neon WAL redo requires base image or will init record");
|
||||
}
|
||||
};
|
||||
|
||||
for (record_lsn, record) in records {
|
||||
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
|
||||
}
|
||||
@@ -8470,4 +8479,135 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/neondatabase/neon/issues/9012
|
||||
// Create an image arrangement where we have to read at different LSN ranges
|
||||
// from a delta layer. This is achieved by overlapping an image layer on top of
|
||||
// a delta layer. Like so:
|
||||
//
|
||||
// A B
|
||||
// +----------------+ -> delta_layer
|
||||
// | | ^ lsn
|
||||
// | =========|-> nested_image_layer |
|
||||
// | C | |
|
||||
// +----------------+ |
|
||||
// ======== -> baseline_image_layer +-------> key
|
||||
//
|
||||
//
|
||||
// When querying the key range [A, B) we need to read at different LSN ranges
|
||||
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
|
||||
#[tokio::test]
|
||||
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let will_init_keys = [2, 6];
|
||||
fn get_key(id: u32) -> Key {
|
||||
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let mut expected_key_values = HashMap::new();
|
||||
|
||||
let baseline_image_layer_lsn = Lsn(0x10);
|
||||
let mut baseline_img_layer = Vec::new();
|
||||
for i in 0..5 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{baseline_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
baseline_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let nested_image_layer_lsn = Lsn(0x50);
|
||||
let mut nested_img_layer = Vec::new();
|
||||
for i in 5..10 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{nested_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
nested_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let mut delta_layer_spec = Vec::default();
|
||||
let delta_layer_start_lsn = Lsn(0x20);
|
||||
let mut delta_layer_end_lsn = delta_layer_start_lsn;
|
||||
|
||||
for i in 0..10 {
|
||||
let key = get_key(i);
|
||||
let key_in_nested = nested_img_layer
|
||||
.iter()
|
||||
.any(|(key_with_img, _)| *key_with_img == key);
|
||||
let lsn = {
|
||||
if key_in_nested {
|
||||
Lsn(nested_image_layer_lsn.0 + 0x10)
|
||||
} else {
|
||||
delta_layer_start_lsn
|
||||
}
|
||||
};
|
||||
|
||||
let will_init = will_init_keys.contains(&i);
|
||||
if will_init {
|
||||
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
|
||||
|
||||
expected_key_values.insert(key, "".to_string());
|
||||
} else {
|
||||
let delta = format!("@{lsn}");
|
||||
delta_layer_spec.push((
|
||||
key,
|
||||
lsn,
|
||||
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
|
||||
));
|
||||
|
||||
expected_key_values
|
||||
.get_mut(&key)
|
||||
.expect("An image exists for each key")
|
||||
.push_str(delta.as_str());
|
||||
}
|
||||
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
|
||||
}
|
||||
|
||||
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
|
||||
|
||||
assert!(
|
||||
nested_image_layer_lsn > delta_layer_start_lsn
|
||||
&& nested_image_layer_lsn < delta_layer_end_lsn
|
||||
);
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
baseline_image_layer_lsn,
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
delta_layer_start_lsn..delta_layer_end_lsn,
|
||||
delta_layer_spec,
|
||||
)], // delta layers
|
||||
vec![
|
||||
(baseline_image_layer_lsn, baseline_img_layer),
|
||||
(nested_image_layer_lsn, nested_img_layer),
|
||||
], // image layers
|
||||
delta_layer_end_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(get_key(0)..get_key(10));
|
||||
let results = tline
|
||||
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
|
||||
.await
|
||||
.expect("No vectored errors");
|
||||
for (key, res) in results {
|
||||
let value = res.expect("No key errors");
|
||||
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
|
||||
assert_eq!(value, Bytes::from(expected_value));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +276,16 @@ pub(crate) enum LayerId {
|
||||
InMemoryLayerId(InMemoryLayerFileId),
|
||||
}
|
||||
|
||||
/// Uniquely identify a layer visit by the layer
|
||||
/// and LSN floor (or start LSN) of the reads.
|
||||
/// The layer itself is not enough since we may
|
||||
/// have different LSN lower bounds for delta layer reads.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
struct LayerToVisitId {
|
||||
layer_id: LayerId,
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
/// Layer wrapper for the read path. Note that it is valid
|
||||
/// to use these layers even after external operations have
|
||||
/// been performed on them (compaction, freeze, etc.).
|
||||
@@ -287,9 +297,9 @@ pub(crate) enum ReadableLayer {
|
||||
|
||||
/// A partial description of a read to be done.
|
||||
#[derive(Debug, Clone)]
|
||||
struct ReadDesc {
|
||||
struct LayerVisit {
|
||||
/// An id used to resolve the readable layer within the fringe
|
||||
layer_id: LayerId,
|
||||
layer_to_visit_id: LayerToVisitId,
|
||||
/// Lsn range for the read, used for selecting the next read
|
||||
lsn_range: Range<Lsn>,
|
||||
}
|
||||
@@ -303,12 +313,12 @@ struct ReadDesc {
|
||||
/// a two layer indexing scheme.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
|
||||
layers: HashMap<LayerId, LayerKeyspace>,
|
||||
planned_visits_by_lsn: BinaryHeap<LayerVisit>,
|
||||
visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
struct LayerVisitReads {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
@@ -316,23 +326,23 @@ struct LayerKeyspace {
|
||||
impl LayerFringe {
|
||||
pub(crate) fn new() -> Self {
|
||||
LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap::new(),
|
||||
layers: HashMap::new(),
|
||||
planned_visits_by_lsn: BinaryHeap::new(),
|
||||
visit_reads: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
|
||||
let read_desc = match self.planned_reads_by_lsn.pop() {
|
||||
let read_desc = match self.planned_visits_by_lsn.pop() {
|
||||
Some(desc) => desc,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let removed = self.layers.remove_entry(&read_desc.layer_id);
|
||||
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
|
||||
|
||||
match removed {
|
||||
Some((
|
||||
_,
|
||||
LayerKeyspace {
|
||||
LayerVisitReads {
|
||||
layer,
|
||||
mut target_keyspace,
|
||||
},
|
||||
@@ -351,20 +361,24 @@ impl LayerFringe {
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
) {
|
||||
let layer_id = layer.id();
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
let layer_to_visit_id = LayerToVisitId {
|
||||
layer_id: layer.id(),
|
||||
lsn_floor: lsn_range.start,
|
||||
};
|
||||
|
||||
let entry = self.visit_reads.entry(layer_to_visit_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
self.planned_visits_by_lsn.push(LayerVisit {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
layer_to_visit_id: layer_to_visit_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
entry.insert(LayerVisitReads {
|
||||
layer,
|
||||
target_keyspace: accum,
|
||||
});
|
||||
@@ -379,7 +393,7 @@ impl Default for LayerFringe {
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for ReadDesc {
|
||||
impl Ord for LayerVisit {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
|
||||
if ord == std::cmp::Ordering::Equal {
|
||||
@@ -390,19 +404,19 @@ impl Ord for ReadDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ReadDesc {
|
||||
impl PartialOrd for LayerVisit {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for ReadDesc {
|
||||
impl PartialEq for LayerVisit {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.lsn_range == other.lsn_range
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for ReadDesc {}
|
||||
impl Eq for LayerVisit {}
|
||||
|
||||
impl ReadableLayer {
|
||||
pub(crate) fn id(&self) -> LayerId {
|
||||
|
||||
Reference in New Issue
Block a user