From 87afbf6b24313cbfa28809ec7ada2e72911263be Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 31 May 2024 12:00:40 -0400 Subject: [PATCH] test(pageserver): add test interface to create artificial layers (#7899) This pull request adds necessary interfaces to deterministically create scenarios we want to test. Simplify some test cases to use this interface to make it stable + reproducible. Compaction test will be able to use this interface. Also the upcoming delete tombstone tests will use this interface to make test reproducible. ## Summary of changes * `force_create_image_layer` * `force_create_delta_layer` * `force_advance_lsn` * `create_test_timeline_with_states` * `branch_timeline_test_with_states` --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 261 ++++++++---------- pageserver/src/tenant/timeline.rs | 96 +++++++ .../src/tenant/timeline/layer_manager.rs | 7 + 3 files changed, 223 insertions(+), 141 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0a9637884f..cfa683beb8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1393,6 +1393,36 @@ impl Tenant { Ok(tl) } + /// Helper for unit tests to create a timeline with some pre-loaded states. + #[cfg(test)] + #[allow(clippy::too_many_arguments)] + pub async fn create_test_timeline_with_layers( + &self, + new_timeline_id: TimelineId, + initdb_lsn: Lsn, + pg_version: u32, + ctx: &RequestContext, + delta_layer_desc: Vec>, + image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, + end_lsn: Lsn, + ) -> anyhow::Result> { + let tline = self + .create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) + .await?; + tline.force_advance_lsn(end_lsn); + for deltas in delta_layer_desc { + tline + .force_create_delta_layer(deltas, Some(initdb_lsn), ctx) + .await?; + } + for (lsn, images) in image_layer_desc { + tline + .force_create_image_layer(lsn, images, Some(initdb_lsn), ctx) + .await?; + } + Ok(tline) + } + /// Create a new timeline. /// /// Returns the new timeline ID and reference to its Timeline object. @@ -2992,17 +3022,53 @@ impl Tenant { &self, src_timeline: &Arc, dst_id: TimelineId, - start_lsn: Option, + ancestor_lsn: Option, ctx: &RequestContext, ) -> Result, CreateTimelineError> { let create_guard = self.create_timeline_create_guard(dst_id).unwrap(); let tl = self - .branch_timeline_impl(src_timeline, dst_id, start_lsn, create_guard, ctx) + .branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, create_guard, ctx) .await?; tl.set_state(TimelineState::Active); Ok(tl) } + /// Helper for unit tests to branch a timeline with some pre-loaded states. + #[cfg(test)] + #[allow(clippy::too_many_arguments)] + pub async fn branch_timeline_test_with_layers( + &self, + src_timeline: &Arc, + dst_id: TimelineId, + ancestor_lsn: Option, + ctx: &RequestContext, + delta_layer_desc: Vec>, + image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, + end_lsn: Lsn, + ) -> anyhow::Result> { + let tline = self + .branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx) + .await?; + let ancestor_lsn = if let Some(ancestor_lsn) = ancestor_lsn { + ancestor_lsn + } else { + tline.get_last_record_lsn() + }; + assert!(end_lsn >= ancestor_lsn); + tline.force_advance_lsn(end_lsn); + for deltas in delta_layer_desc { + tline + .force_create_delta_layer(deltas, Some(ancestor_lsn), ctx) + .await?; + } + for (lsn, images) in image_layer_desc { + tline + .force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx) + .await?; + } + Ok(tline) + } + /// Branch an existing timeline. /// /// The caller is responsible for activating the returned timeline. @@ -6206,75 +6272,36 @@ mod tests { async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> { let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?; let (tenant, ctx) = harness.load().await; - let tline = tenant - .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) - .await?; - - let cancel = CancellationToken::new(); let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap(); let base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap(); let base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap(); - let mut lsn = Lsn(0x20); - - { - let mut writer = tline.writer().await; - writer - .put(base_key, lsn, &Value::Image(test_img("data key 1")), &ctx) - .await?; - writer.finish_write(lsn); - drop(writer); - - tline.freeze_and_flush().await?; // this will create a image layer - } + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + Vec::new(), // delta layers + vec![(Lsn(0x20), vec![(base_key, test_img("data key 1"))])], // image layers + Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN + ) + .await?; let child = tenant - .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx) + .branch_timeline_test_with_layers( + &tline, + NEW_TIMELINE_ID, + Some(Lsn(0x20)), + &ctx, + Vec::new(), // delta layers + vec![(Lsn(0x30), vec![(base_key_child, test_img("data key 2"))])], // image layers + Lsn(0x30), + ) .await .unwrap(); - lsn.0 += 0x10; - - { - let mut writer = child.writer().await; - writer - .put( - base_key_child, - lsn, - &Value::Image(test_img("data key 2")), - &ctx, - ) - .await?; - writer.finish_write(lsn); - drop(writer); - - child.freeze_and_flush().await?; // this will create a delta - - { - // update the partitioning to include the test key space, otherwise they - // will be dropped by image layer creation - let mut guard = child.partitioning.lock().await; - let ((partitioning, _), partition_lsn) = &mut *guard; - partitioning - .parts - .push(KeySpace::single(base_key..base_key_nonexist)); // exclude the nonexist key - *partition_lsn = lsn; - } - - child - .compact( - &cancel, - { - let mut set = EnumSet::empty(); - set.insert(CompactFlags::ForceImageLayerCreation); - set - }, - &ctx, - ) - .await?; // force create an image layer for the keys, TODO: check if the image layer is created - } - async fn get_vectored_impl_wrapper( tline: &Arc, key: Key, @@ -6296,6 +6323,8 @@ mod tests { })) } + let lsn = Lsn(0x30); + // test vectored get on parent timeline assert_eq!( get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?, @@ -6333,94 +6362,42 @@ mod tests { #[tokio::test] async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?; + let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?; let (tenant, ctx) = harness.load().await; + + let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + let base_key_child = Key::from_hex("620000000033333333444444445500000001").unwrap(); + let base_key_nonexist = Key::from_hex("620000000033333333444444445500000002").unwrap(); + assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix... + let tline = tenant - .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + Vec::new(), // delta layers + vec![(Lsn(0x20), vec![(base_key, test_img("metadata key 1"))])], // image layers + Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN + ) .await?; - let cancel = CancellationToken::new(); - - let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap(); - let mut base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap(); - let mut base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap(); - base_key.field1 = AUX_KEY_PREFIX; - base_key_child.field1 = AUX_KEY_PREFIX; - base_key_nonexist.field1 = AUX_KEY_PREFIX; - - let mut lsn = Lsn(0x20); - - { - let mut writer = tline.writer().await; - writer - .put( - base_key, - lsn, - &Value::Image(test_img("metadata key 1")), - &ctx, - ) - .await?; - writer.finish_write(lsn); - drop(writer); - - tline.freeze_and_flush().await?; // this will create an image layer - - tline - .compact( - &cancel, - { - let mut set = EnumSet::empty(); - set.insert(CompactFlags::ForceImageLayerCreation); - set.insert(CompactFlags::ForceRepartition); - set - }, - &ctx, - ) - .await?; // force create an image layer for metadata keys - tenant - .gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx) - .await?; - } - let child = tenant - .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx) + .branch_timeline_test_with_layers( + &tline, + NEW_TIMELINE_ID, + Some(Lsn(0x20)), + &ctx, + Vec::new(), // delta layers + vec![( + Lsn(0x30), + vec![(base_key_child, test_img("metadata key 2"))], + )], // image layers + Lsn(0x30), + ) .await .unwrap(); - lsn.0 += 0x10; - - { - let mut writer = child.writer().await; - writer - .put( - base_key_child, - lsn, - &Value::Image(test_img("metadata key 2")), - &ctx, - ) - .await?; - writer.finish_write(lsn); - drop(writer); - - child.freeze_and_flush().await?; - - child - .compact( - &cancel, - { - let mut set = EnumSet::empty(); - set.insert(CompactFlags::ForceImageLayerCreation); - set.insert(CompactFlags::ForceRepartition); - set - }, - &ctx, - ) - .await?; // force create an image layer for metadata keys - tenant - .gc_iteration(Some(child.timeline_id), 0, Duration::ZERO, &cancel, &ctx) - .await?; - } - async fn get_vectored_impl_wrapper( tline: &Arc, key: Key, @@ -6442,6 +6419,8 @@ mod tests { })) } + let lsn = Lsn(0x30); + // test vectored get on parent timeline assert_eq!( get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b498876465..8033edaa12 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5371,6 +5371,102 @@ impl Timeline { shard_count: self.tenant_shard_id.shard_count, } } + + #[cfg(test)] + pub(super) fn force_advance_lsn(self: &Arc, new_lsn: Lsn) { + self.last_record_lsn.advance(new_lsn); + } + + /// Force create an image layer and place it into the layer map. + /// + /// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`] + /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run. + #[cfg(test)] + pub(super) async fn force_create_image_layer( + self: &Arc, + lsn: Lsn, + mut images: Vec<(Key, Bytes)>, + check_start_lsn: Option, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let last_record_lsn = self.get_last_record_lsn(); + assert!( + lsn <= last_record_lsn, + "advance last record lsn before inserting a layer, lsn={lsn}, last_record_lsn={last_record_lsn}" + ); + if let Some(check_start_lsn) = check_start_lsn { + assert!(lsn >= check_start_lsn); + } + images.sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb)); + let min_key = *images.first().map(|(k, _)| k).unwrap(); + let max_key = images.last().map(|(k, _)| k).unwrap().next(); + let mut image_layer_writer = ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + &(min_key..max_key), + lsn, + ctx, + ) + .await?; + for (key, img) in images { + image_layer_writer.put_image(key, img, ctx).await?; + } + let image_layer = image_layer_writer.finish(self, ctx).await?; + + { + let mut guard = self.layers.write().await; + guard.force_insert_layer(image_layer); + } + + Ok(()) + } + + /// Force create a delta layer and place it into the layer map. + /// + /// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`] + /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run. + #[cfg(test)] + pub(super) async fn force_create_delta_layer( + self: &Arc, + mut deltas: Vec<(Key, Lsn, Value)>, + check_start_lsn: Option, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let last_record_lsn = self.get_last_record_lsn(); + deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); + let min_key = *deltas.first().map(|(k, _, _)| k).unwrap(); + let max_key = deltas.last().map(|(k, _, _)| k).unwrap().next(); + let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap(); + let max_lsn = Lsn(deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap().0 + 1); + assert!( + max_lsn <= last_record_lsn, + "advance last record lsn before inserting a layer, max_lsn={max_lsn}, last_record_lsn={last_record_lsn}" + ); + if let Some(check_start_lsn) = check_start_lsn { + assert!(min_lsn >= check_start_lsn); + } + let mut delta_layer_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + min_key, + min_lsn..max_lsn, + ctx, + ) + .await?; + for (key, lsn, val) in deltas { + delta_layer_writer.put_value(key, lsn, val, ctx).await?; + } + let delta_layer = delta_layer_writer.finish(max_key, self, ctx).await?; + + { + let mut guard = self.layers.write().await; + guard.force_insert_layer(delta_layer); + } + + Ok(()) + } } type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId); diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 884b71df75..b78c98a506 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -255,6 +255,13 @@ impl LayerManager { updates.flush() } + #[cfg(test)] + pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) { + let mut updates = self.layer_map.batch_update(); + Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr); + updates.flush() + } + /// Helper function to insert a layer into the layer map and file manager. fn insert_historic_layer( layer: Layer,