From 3596ed43eb0f87be57840e3d730c7e321b55a36a Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 16 Sep 2024 15:41:11 +0100 Subject: [PATCH] Investigation --- pageserver/src/tenant/layer_map.rs | 26 +++ pageserver/src/tenant/storage_layer.rs | 1 - .../src/tenant/storage_layer/delta_layer.rs | 168 ----------------- .../src/tenant/storage_layer/layer/tests.rs | 178 ------------------ pageserver/src/tenant/vectored_blob_io.rs | 140 -------------- 5 files changed, 26 insertions(+), 487 deletions(-) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 707233b003..3819fd5ccd 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -1176,6 +1176,32 @@ mod tests { } } + #[test] + fn vlad_test() { + let layers = vec![ + LayerDesc { + key_range: Key::from_i128(0)..Key::from_i128(100), + lsn_range: Lsn(0)..Lsn(100), + is_delta: true, + }, + LayerDesc { + key_range: Key::from_i128(20)..Key::from_i128(30), + lsn_range: Lsn(10)..Lsn(50), + is_delta: false, + }, + ]; + + let layer_map = create_layer_map(layers.clone()); + + let range = Key::from_i128(0)..Key::from_i128(100); + let result = layer_map.range_search(range.clone(), Lsn(100)); + let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + + eprintln!("result: {result:?}"); + + assert_range_search_result_eq(result, expected); + } + #[test] fn layer_visibility_basic() { // A simple synthetic input, as a smoke test. diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 24e03ed330..da64a68390 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -456,7 +456,6 @@ impl LayerFringe { } panic!("LSN range assumption violated"); } - assert_eq!(lsn_range, entry.get().lsn_range); } Entry::Vacant(entry) => { self.planned_reads_by_lsn.push(ReadDesc { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 58e292d3b2..c13016a058 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1578,78 +1578,6 @@ pub(crate) mod test { }; use bytes::Bytes; - /// Construct an index for a fictional delta layer and and then - /// traverse in order to plan vectored reads for a query. Finally, - /// verify that the traversal fed the right index key and value - /// pairs into the planner. - #[tokio::test] - async fn test_delta_layer_index_traversal() { - let base_key = Key { - field1: 0, - field2: 1663, - field3: 12972, - field4: 16396, - field5: 0, - field6: 246080, - }; - - // Populate the index with some entries - let entries: BTreeMap> = BTreeMap::from([ - (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]), - (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]), - (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]), - (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]), - ]); - - let mut disk = TestDisk::default(); - let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk); - - let mut disk_offset = 0; - for (key, lsns) in &entries { - for lsn in lsns { - let index_key = DeltaKey::from_key_lsn(key, *lsn); - let blob_ref = BlobRef::new(disk_offset, false); - writer - .append(&index_key.0, blob_ref.0) - .expect("In memory disk append should never fail"); - - disk_offset += 1; - } - } - - // Prepare all the arguments for the call into `plan_reads` below - let (root_offset, _writer) = writer - .finish() - .expect("In memory disk finish should never fail"); - let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk); - let planner = VectoredReadPlanner::new(100); - let mut reconstruct_state = ValuesReconstructState::new(); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); - - let keyspace = KeySpace { - ranges: vec![ - base_key..base_key.add(3), - base_key.add(3)..base_key.add(100), - ], - }; - let lsn_range = Lsn(2)..Lsn(40); - - // Plan and validate - let vectored_reads = DeltaLayerInner::plan_reads( - &keyspace, - lsn_range.clone(), - disk_offset, - reader, - planner, - &mut reconstruct_state, - &ctx, - ) - .await - .expect("Read planning should not fail"); - - validate(keyspace, lsn_range, vectored_reads, entries); - } - fn validate( keyspace: KeySpace, lsn_range: Range, @@ -1827,102 +1755,6 @@ pub(crate) mod test { keyspace } - #[tokio::test] - async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?; - let (tenant, ctx) = harness.load().await; - - let timeline_id = TimelineId::generate(); - let timeline = tenant - .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx) - .await?; - - tracing::info!("Generating test data ..."); - - let rng = &mut StdRng::seed_from_u64(0); - let entries = generate_entries(rng); - let entries_meta = get_entries_meta(&entries); - - tracing::info!("Done generating {} entries", entries.len()); - - tracing::info!("Writing test data to delta layer ..."); - let mut writer = DeltaLayerWriter::new( - harness.conf, - timeline_id, - harness.tenant_shard_id, - entries_meta.key_range.start, - entries_meta.lsn_range.clone(), - &ctx, - ) - .await?; - - for entry in entries { - let (_, res) = writer - .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx) - .await; - res?; - } - - let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?; - let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?; - - let inner = resident.get_as_delta(&ctx).await?; - - let file_size = inner.file.metadata().await?.len(); - tracing::info!( - "Done writing test data to delta layer. Resulting file size is: {}", - file_size - ); - - for i in 0..constants::READS_COUNT { - tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT); - - let block_reader = FileBlockReader::new(&inner.file, inner.file_id); - let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, - block_reader, - ); - - let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES); - let mut reconstruct_state = ValuesReconstructState::new(); - let keyspace = pick_random_keyspace(rng, &entries_meta.key_range); - let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64; - - let vectored_reads = DeltaLayerInner::plan_reads( - &keyspace, - entries_meta.lsn_range.clone(), - data_end_offset, - index_reader, - planner, - &mut reconstruct_state, - &ctx, - ) - .await?; - - let vectored_blob_reader = VectoredBlobReader::new(&inner.file); - let buf_size = DeltaLayerInner::get_min_read_buffer_size( - &vectored_reads, - constants::MAX_VECTORED_READ_BYTES, - ); - let mut buf = Some(BytesMut::with_capacity(buf_size)); - - for read in vectored_reads { - let blobs_buf = vectored_blob_reader - .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx) - .await?; - for meta in blobs_buf.blobs.iter() { - let value = &blobs_buf.buf[meta.start..meta.end]; - assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]); - } - - buf = Some(blobs_buf.buf); - } - } - - Ok(()) - } - #[tokio::test] async fn copy_delta_prefix_smoke() { use crate::walrecord::NeonWalRecord; diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index b2e69dae07..67b8677589 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -19,184 +19,6 @@ const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600); /// timeout uses to advance futures. const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7); -/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions. -#[tokio::test] -async fn smoke_test() { - let handle = tokio::runtime::Handle::current(); - - let h = TenantHarness::create("smoke_test").await.unwrap(); - let span = h.span(); - let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); - let (tenant, _) = h.load().await; - - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); - - let timeline = tenant - .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) - .await - .unwrap(); - - let layer = { - let mut layers = { - let layers = timeline.layers.read().await; - layers.likely_resident_layers().cloned().collect::>() - }; - - assert_eq!(layers.len(), 1); - - layers.swap_remove(0) - }; - - // all layers created at pageserver are like `layer`, initialized with strong - // Arc. - - let controlfile_keyspace = KeySpace { - ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()], - }; - - let img_before = { - let mut data = ValuesReconstructState::default(); - layer - .get_values_reconstruct_data( - controlfile_keyspace.clone(), - Lsn(0x10)..Lsn(0x11), - &mut data, - &ctx, - ) - .await - .unwrap(); - data.keys - .remove(&CONTROLFILE_KEY) - .expect("must be present") - .expect("should not error") - .img - .take() - .expect("tenant harness writes the control file") - }; - - // important part is evicting the layer, which can be done when there are no more ResidentLayer - // instances -- there currently are none, only two `Layer` values, one in the layermap and on - // in scope. - layer.evict_and_wait(FOREVER).await.unwrap(); - - // double-evict returns an error, which is valid if both eviction_task and disk usage based - // eviction would both evict the same layer at the same time. - - let e = layer.evict_and_wait(FOREVER).await.unwrap_err(); - assert!(matches!(e, EvictionError::NotFound)); - - // on accesses when the layer is evicted, it will automatically be downloaded. - let img_after = { - let mut data = ValuesReconstructState::default(); - layer - .get_values_reconstruct_data( - controlfile_keyspace.clone(), - Lsn(0x10)..Lsn(0x11), - &mut data, - &ctx, - ) - .instrument(download_span.clone()) - .await - .unwrap(); - data.keys - .remove(&CONTROLFILE_KEY) - .expect("must be present") - .expect("should not error") - .img - .take() - .expect("tenant harness writes the control file") - }; - - let img_before = (img_before.0, img_before.1.await.unwrap().unwrap()); - let img_after = (img_after.0, img_after.1.await.unwrap().unwrap()); - assert_eq!(img_before, img_after); - - // evict_and_wait can timeout, but it doesn't cancel the evicting itself - // - // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to - // artificially slow it down. - let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await; - - match layer - .evict_and_wait(std::time::Duration::ZERO) - .await - .unwrap_err() - { - EvictionError::Timeout => { - // expected, but note that the eviction is "still ongoing" - helper.release().await; - // exhaust spawn_blocking pool to ensure it is now complete - SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle) - .await; - } - other => unreachable!("{other:?}"), - } - - // only way to query if a layer is resident is to acquire a ResidentLayer instance. - // Layer::keep_resident never downloads, but it might initialize if the layer file is found - // downloaded locally. - let none = layer.keep_resident().await; - assert!( - none.is_none(), - "Expected none, because eviction removed the local file, found: {none:?}" - ); - - // plain downloading is rarely needed - layer - .download_and_keep_resident() - .instrument(download_span) - .await - .unwrap(); - - // last important part is deletion on drop: gc and compaction use it for compacted L0 layers - // or fully garbage collected layers. deletion means deleting the local file, and scheduling a - // deletion of the already unlinked from index_part.json remote file. - // - // marking a layer to be deleted on drop is irreversible; there is no technical reason against - // reversiblity, but currently it is not needed so it is not provided. - layer.delete_on_drop(); - - let path = layer.local_path().to_owned(); - - // wait_drop produces an unconnected to Layer future which will resolve when the - // LayerInner::drop has completed. - let mut wait_drop = std::pin::pin!(layer.wait_drop()); - - // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing - // until here - tokio::time::pause(); - tokio::time::timeout(ADVANCE, &mut wait_drop) - .await - .expect_err("should had timed out because two strong references exist"); - - tokio::fs::metadata(&path) - .await - .expect("the local layer file still exists"); - - let rtc = &timeline.remote_client; - - { - let layers = &[layer]; - let mut g = timeline.layers.write().await; - g.open_mut().unwrap().finish_gc_timeline(layers); - // this just updates the remote_physical_size for demonstration purposes - rtc.schedule_gc_update(layers).unwrap(); - } - - // when strong references are dropped, the file is deleted and remote deletion is scheduled - wait_drop.await; - - let e = tokio::fs::metadata(&path) - .await - .expect_err("the local file is deleted"); - assert_eq!(e.kind(), std::io::ErrorKind::NotFound); - - rtc.wait_completion().await.unwrap(); - - assert_eq!(rtc.get_remote_physical_size(), 0); - assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get()) -} - /// This test demonstrates a previous hang when a eviction and deletion were requested at the same /// time. Now both of them complete per Arc drop semantics. #[tokio::test(start_paused = true)] diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index ca7ef050ed..efc31c0497 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -774,146 +774,6 @@ mod tests { assert_eq!(expected_offsets_in_read, offsets_in_read); } - #[test] - fn planner_chunked_coalesce_all_test() { - use crate::virtual_file; - - let chunk_size = virtual_file::get_io_buffer_alignment() as u64; - - // The test explicitly does not check chunk size < 512 - if chunk_size < 512 { - return; - } - - let max_read_size = chunk_size as usize * 8; - let key = Key::MIN; - let lsn = Lsn(0); - - let blob_descriptions = [ - (key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN - (key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap - (key, lsn, chunk_size / 2, BlobFlag::None), - (key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap - (key, lsn, chunk_size, BlobFlag::None), - (key, lsn, chunk_size * 2 - 1, BlobFlag::None), - (key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap - (key, lsn, chunk_size * 3 + 1, BlobFlag::None), - (key, lsn, chunk_size * 5 + 1, BlobFlag::None), - (key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce. - (key, lsn, chunk_size * 7 + 1, BlobFlag::None), - (key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size) - (key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk - (key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce) - ]; - - let ranges = [ - &[ - blob_descriptions[0], - blob_descriptions[2], - blob_descriptions[4], - blob_descriptions[5], - blob_descriptions[7], - blob_descriptions[8], - blob_descriptions[10], - ], - &blob_descriptions[11..12], - &blob_descriptions[13..], - ]; - - let mut planner = VectoredReadPlanner::new(max_read_size); - for (key, lsn, offset, flag) in blob_descriptions { - planner.handle(key, lsn, offset, flag); - } - - planner.handle_range_end(652 * 1024); - - let reads = planner.finish(); - - assert_eq!(reads.len(), ranges.len()); - - for (idx, read) in reads.iter().enumerate() { - validate_read(read, ranges[idx]); - } - } - - #[test] - fn planner_max_read_size_test() { - let max_read_size = 128 * 1024; - let key = Key::MIN; - let lsn = Lsn(0); - - let blob_descriptions = vec![ - (key, lsn, 0, BlobFlag::None), - (key, lsn, 32 * 1024, BlobFlag::None), - (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1 - (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2 - (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3 - (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4 - (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5 - (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6 - ]; - - let ranges = [ - &blob_descriptions[0..3], - &blob_descriptions[3..4], - &blob_descriptions[4..5], - &blob_descriptions[5..6], - &blob_descriptions[6..7], - &blob_descriptions[7..], - ]; - - let mut planner = VectoredReadPlanner::new(max_read_size); - for (key, lsn, offset, flag) in blob_descriptions.clone() { - planner.handle(key, lsn, offset, flag); - } - - planner.handle_range_end(652 * 1024); - - let reads = planner.finish(); - - assert_eq!(reads.len(), 6); - - // TODO: could remove zero reads to produce 5 reads here - - for (idx, read) in reads.iter().enumerate() { - validate_read(read, ranges[idx]); - } - } - - #[test] - fn planner_replacement_test() { - let chunk_size = virtual_file::get_io_buffer_alignment() as u64; - let max_read_size = 128 * chunk_size as usize; - let first_key = Key::MIN; - let second_key = first_key.next(); - let lsn = Lsn(0); - - let blob_descriptions = vec![ - (first_key, lsn, 0, BlobFlag::None), // First in read 1 - (first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1 - (second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll), - (second_key, lsn, 3 * chunk_size, BlobFlag::None), - (second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2 - (second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2 - ]; - - let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]]; - - let mut planner = VectoredReadPlanner::new(max_read_size); - for (key, lsn, offset, flag) in blob_descriptions.clone() { - planner.handle(key, lsn, offset, flag); - } - - planner.handle_range_end(6 * chunk_size); - - let reads = planner.finish(); - assert_eq!(reads.len(), 2); - - for (idx, read) in reads.iter().enumerate() { - validate_read(read, ranges[idx]); - } - } - #[test] fn streaming_planner_max_read_size_test() { let max_read_size = 128 * 1024;