diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 64412fe4af..43941b6e17 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1672,6 +1672,7 @@ pub(crate) mod test { use rand::RngCore; use super::*; + use crate::repository::Value; use crate::tenant::harness::TIMELINE_ID; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; use crate::tenant::Tenant; @@ -1681,6 +1682,7 @@ pub(crate) mod test { tenant::{disk_btree::tests::TestDisk, harness::TenantHarness}, DEFAULT_PG_VERSION, }; + use bytes::Bytes; /// Construct an index for a fictional delta layer and and then /// traverse in order to plan vectored reads for a query. Finally, @@ -2249,6 +2251,15 @@ pub(crate) mod test { (k1, l1).cmp(&(k2, l2)) } + pub(crate) fn sort_delta_value( + (k1, l1, v1): &(Key, Lsn, Value), + (k2, l2, v2): &(Key, Lsn, Value), + ) -> std::cmp::Ordering { + let order_1 = if v1.is_image() { 0 } else { 1 }; + let order_2 = if v2.is_image() { 0 } else { 1 }; + (k1, l1, order_1).cmp(&(k2, l2, order_2)) + } + pub(crate) async fn produce_delta_layer( tenant: &Tenant, tline: &Arc, @@ -2257,7 +2268,7 @@ pub(crate) mod test { ) -> anyhow::Result { deltas.sort_by(sort_delta); let (key_start, _, _) = deltas.first().unwrap(); - let (key_max, _, _) = deltas.first().unwrap(); + let (key_max, _, _) = deltas.last().unwrap(); let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap(); let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap(); let lsn_end = Lsn(lsn_max.0 + 1); @@ -2302,9 +2313,6 @@ pub(crate) mod test { #[tokio::test] async fn delta_layer_iterator() { - use crate::repository::Value; - use bytes::Bytes; - let harness = TenantHarness::create("delta_layer_iterator").unwrap(); let (tenant, ctx) = harness.load().await; diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index 68759f7585..0edfd4bd40 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -96,15 +96,22 @@ impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> { impl<'a> std::cmp::Ord for IteratorWrapper<'a> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { use std::cmp::Ordering; - let a = self.peek_next_key_lsn(); - let b = other.peek_next_key_lsn(); + let a = self.peek_next_key_lsn_value(); + let b = other.peek_next_key_lsn_value(); match (a, b) { - (Some((k1, l1)), Some((k2, l2))) => { - let loaded_1 = if self.is_loaded() { 1 } else { 0 }; - let loaded_2 = if other.is_loaded() { 1 } else { 0 }; + (Some((k1, l1, v1)), Some((k2, l2, v2))) => { + fn map_value_to_num(val: &Option<&Value>) -> usize { + match val { + None => 0, + Some(Value::Image(_)) => 1, + Some(Value::WalRecord(_)) => 2, + } + } + let order_1 = map_value_to_num(&v1); + let order_2 = map_value_to_num(&v2); // When key_lsn are the same, the unloaded iter will always appear before the loaded one. // And note that we do a reverse at the end of the comparison, so it works with the max heap. - (k1, l1, loaded_1).cmp(&(k2, l2, loaded_2)) + (k1, l1, order_1).cmp(&(k2, l2, order_2)) } (Some(_), None) => Ordering::Less, (None, Some(_)) => Ordering::Greater, @@ -137,13 +144,16 @@ impl<'a> IteratorWrapper<'a> { } } - fn peek_next_key_lsn(&self) -> Option<(&Key, Lsn)> { + fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> { match self { - Self::Loaded { iter } => iter.peek().as_ref().map(|(key, lsn, _)| (key, *lsn)), + Self::Loaded { iter } => iter + .peek() + .as_ref() + .map(|(key, lsn, val)| (key, *lsn, Some(val))), Self::NotLoaded { first_key_lower_bound: (key, lsn), .. - } => Some((key, *lsn)), + } => Some((key, *lsn, None)), } } @@ -191,6 +201,13 @@ impl<'a> IteratorWrapper<'a> { } } +/// A merge iterator over delta/image layer iterators. When duplicated records are +/// found, the iterator will not perform any deduplication, and the caller should handle +/// these situation. By saying duplicated records, there are many possibilities: +/// * Two same delta at the same LSN. +/// * Two same image at the same LSN. +/// * Delta/image at the same LSN where the image has already applied the delta. +/// The iterator will always put the image before the delta. pub struct MergeIterator<'a> { heap: BinaryHeap>, } @@ -245,8 +262,9 @@ mod tests { use crate::{ tenant::{ harness::{TenantHarness, TIMELINE_ID}, - storage_layer::delta_layer::test::{produce_delta_layer, sort_delta}, + storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value}, }, + walrecord::NeonWalRecord, DEFAULT_PG_VERSION, }; @@ -407,6 +425,127 @@ mod tests { // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge } - // TODO: image layer merge, delta+image mixed merge - // TODO: is it possible to have duplicated delta at same LSN now? we might need to test that + #[tokio::test] + async fn delta_image_mixed_merge() { + use crate::repository::Value; + use bytes::Bytes; + + let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge").unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + // In this test case, we want to test if the iterator still works correctly with multiple copies + // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab. + // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix. + // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation + // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation + // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should + // correctly process these situations and return everything as-is, and the upper layer of the system + // will handle duplicated LSNs. + let test_deltas1 = vec![ + ( + get_key(0), + Lsn(0x10), + Value::WalRecord(NeonWalRecord::wal_init()), + ), + ( + get_key(0), + Lsn(0x18), + Value::WalRecord(NeonWalRecord::wal_append("a")), + ), + ( + get_key(5), + Lsn(0x10), + Value::WalRecord(NeonWalRecord::wal_init()), + ), + ( + get_key(5), + Lsn(0x18), + Value::WalRecord(NeonWalRecord::wal_append("b")), + ), + ]; + let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx) + .await + .unwrap(); + let mut test_deltas2 = test_deltas1.clone(); + test_deltas2.push(( + get_key(10), + Lsn(0x20), + Value::Image(Bytes::copy_from_slice(b"test")), + )); + let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx) + .await + .unwrap(); + let test_deltas3 = vec![ + ( + get_key(0), + Lsn(0x10), + Value::Image(Bytes::copy_from_slice(b"")), + ), + ( + get_key(5), + Lsn(0x18), + Value::Image(Bytes::copy_from_slice(b"b")), + ), + ( + get_key(15), + Lsn(0x20), + Value::Image(Bytes::copy_from_slice(b"test")), + ), + ]; + let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx) + .await + .unwrap(); + let mut test_deltas4 = test_deltas3.clone(); + test_deltas4.push(( + get_key(20), + Lsn(0x20), + Value::Image(Bytes::copy_from_slice(b"test")), + )); + let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx) + .await + .unwrap(); + let mut expect = Vec::new(); + expect.extend(test_deltas1); + expect.extend(test_deltas2); + expect.extend(test_deltas3); + expect.extend(test_deltas4); + expect.sort_by(sort_delta_value); + + // Test with different layer order for MergeIterator::create to ensure the order + // is stable. + + let mut merge_iter = MergeIterator::create( + &[ + resident_layer_4.get_as_delta(&ctx).await.unwrap(), + resident_layer_1.get_as_delta(&ctx).await.unwrap(), + resident_layer_3.get_as_delta(&ctx).await.unwrap(), + resident_layer_2.get_as_delta(&ctx).await.unwrap(), + ], + &[], + &ctx, + ); + assert_merge_iter_equal(&mut merge_iter, &expect).await; + + let mut merge_iter = MergeIterator::create( + &[ + resident_layer_1.get_as_delta(&ctx).await.unwrap(), + resident_layer_4.get_as_delta(&ctx).await.unwrap(), + resident_layer_3.get_as_delta(&ctx).await.unwrap(), + resident_layer_2.get_as_delta(&ctx).await.unwrap(), + ], + &[], + &ctx, + ); + assert_merge_iter_equal(&mut merge_iter, &expect).await; + } }