From af246e340e4fad047c4d3105ec834fe3c1e639fc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 10 Apr 2025 14:58:59 +0200 Subject: [PATCH] DNM: notes on buffered writer integration from current code perspective --- .../src/tenant/storage_layer/delta_layer.rs | 127 +++++++++++++++++- .../virtual_file/owned_buffers_io/write.rs | 2 + 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 05b0bc1a5c..d73b36a3c0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1596,6 +1596,7 @@ impl DeltaLayerIterator<'_> { #[cfg(test)] pub(crate) mod test { use std::collections::BTreeMap; + use std::io::Read; use bytes::Bytes; use itertools::MinMaxResult; @@ -1604,12 +1605,14 @@ pub(crate) mod test { use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use super::*; + use crate::tenant::remote_timeline_client::LayerFileMetadata; use crate::DEFAULT_PG_VERSION; + use crate::assert_u64_eq_usize::U64IsUsize; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; use crate::tenant::disk_btree::tests::TestDisk; use crate::tenant::harness::{TIMELINE_ID, TenantHarness}; - use crate::tenant::storage_layer::{Layer, ResidentLayer}; + use crate::tenant::storage_layer::{IoConcurrency, Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; use crate::tenant::{Tenant, Timeline}; @@ -2305,4 +2308,126 @@ pub(crate) mod test { } } } + + #[tokio::test] + async fn test_delta_layer_padding() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_delta_layer_padding").await?; + let (tenant, ctx) = harness.load().await; + + let timeline_id = TimelineId::generate(); + let timeline = tenant + .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx) + .await?; + + tracing::info!("Generating test data ..."); + + fn make_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let layer_key_range = make_key(0)..make_key(0x100); + let layer_lsn_range = Lsn(0x10)..Lsn(0x20); + let mut writer = DeltaLayerWriter::new( + harness.conf, + timeline_id, + harness.tenant_shard_id, + layer_key_range.start, + layer_lsn_range.clone(), + &ctx, + ) + .await?; + + let img_key = make_key(0x23); + let img_lsn = Lsn(0x15); + let img_value = b"abcdefgh"; + writer + .put_value( + img_key, + img_lsn, + Value::Image(Bytes::from_owner(img_value)), + &ctx, + ) + .await?; + + let (desc, path) = writer.finish(layer_key_range.end, &ctx).await?; + + let md = std::fs::metadata(&path)?; + println!("{md:?}"); + + let layer = Layer::for_resident( + harness.conf, + &timeline, + path.clone(), + desc.layer_name(), + LayerFileMetadata { + file_size: md.len(), + generation: tenant.generation(), + shard: tenant.shard_identity.shard_index(), + }, + ); + + let layer = layer.drop_eviction_guard(); + + let io_concurrency = IoConcurrency::sequential(); + let mut reconstruct_state = ValuesReconstructState::new(io_concurrency); + layer + .get_values_reconstruct_data( + KeySpace::single(layer_key_range), + layer_lsn_range.clone(), + &mut reconstruct_state, + &ctx, + ) + .await?; + + assert_eq!(reconstruct_state.keys.len(), 1); + let (found_key, state) = reconstruct_state.keys.drain().next().unwrap(); + assert_eq!(found_key, img_key); + let state = state.collect_pending_ios().await?; + assert_eq!(state.records.len(), 0); + let (lsn, img) = state.img.unwrap(); + assert_eq!(lsn, Lsn(0x15)); + assert_eq!(img, Bytes::from_static(img_value)); + + // make some assertions about the data layout + let mut buf = Vec::with_capacity(md.len().into_usize()); + std::fs::File::open(&path)?.read_to_end(&mut buf)?; + let offsets: Vec<_> = buf + .windows(img_value.len()) + .positions(|window| window == img_value) + .collect(); + assert_eq!( + offsets.len(), + 1, + "img value appears multiple times in bit pattern on disk" + ); + let offset = offsets[0]; + println!("img value offset: {offset}"); + let end = offset + img_value.len(); + assert_ne!( + end % PAGE_SZ, + 0, + "img value is so short it doesn't fill a full page" + ); + let expect_index_block_start = end.next_multiple_of(PAGE_SZ); + assert!( + expect_index_block_start < md.len().into_usize(), + "there is a block after the block that contains the value" + ); + let resident = layer.download_and_keep_resident(&ctx).await?; + let delta = resident.get_as_delta(&ctx).await?; + assert_eq!( + expect_index_block_start, + delta.index_start_offset().into_usize(), + "this test's understanding that the next block is the index block is correct" + ); + assert_eq!( + expect_index_block_start + PAGE_SZ, + md.len().into_usize(), + "the index block is one block long" + ); + + Ok(()) + } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f3ab2c285a..30019a0b84 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -120,6 +120,8 @@ where mut self, ctx: &RequestContext, ) -> Result<(u64, Arc), FlushTaskError> { + // TODO: this is incorrect/infeasible with direct IO because tail may be only be partially filled, e.g., 23 bytes in it. + // The buffer is guaranteed to be aligned, but the write system call will fail with EINVAL because the buffer size is not right. self.flush(ctx).await?; let Self {