diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ffcb972613..5d61333774 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -344,7 +344,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { tmp_dir: Utf8PathBuf, storage: Arc, file_key: RemotePath, - ) -> Result, u64, u64)>, anyhow::Error> + ) -> Result, u64, u64, u64)>, anyhow::Error> { let _permit = semaphore.acquire().await?; let cancel = CancellationToken::new(); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a0328951f3..d084c7c877 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -14,7 +14,6 @@ use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; -use postgres_ffi::BLCKSZ; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 7d811f500e..fb09df3edc 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -53,6 +53,7 @@ use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; +use std::os::unix::fs::MetadataExt; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; @@ -371,7 +372,7 @@ impl ImageLayer { dest_repo_path: &Utf8Path, path: &Utf8Path, ctx: &RequestContext, - ) -> anyhow::Result, u64, u64)>> { + ) -> anyhow::Result, u64, u64, u64)>> { fn make_conf( image_compression: Option, dest_repo_path: &Utf8Path, @@ -387,27 +388,35 @@ impl ImageLayer { Some(ImageCompressionAlgorithm::ZstdHigh), Some(ImageCompressionAlgorithm::LZ4), ]; + let confs = image_compressions + .clone() + .map(|compression| make_conf(compression, dest_repo_path)); let mut stats = Vec::new(); - for image_compression in image_compressions { - let start = Instant::now(); - let size = Self::compressed_size_for_conf( - path, - ctx, - make_conf(image_compression, dest_repo_path), - ) - .await?; - let elapsed_ms = start.elapsed().as_millis() as u64; - stats.push((image_compression, size, elapsed_ms)); + for (image_compression, conf) in image_compressions.into_iter().zip(confs) { + let start_compression = Instant::now(); + let compressed_path = Self::compress_for_conf(path, ctx, conf).await?; + let size = path.metadata()?.size(); + let elapsed_ms = start_compression.elapsed().as_millis() as u64; + let start_decompression = Instant::now(); + Self::compare_are_equal(path, &compressed_path, ctx).await?; + let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64; + std::fs::remove_file(compressed_path)?; + stats.push(( + image_compression, + size, + elapsed_ms, + elapsed_decompression_ms, + )); tokio::task::yield_now().await; } Ok(stats) } - async fn compressed_size_for_conf( + async fn compress_for_conf( path: &Utf8Path, ctx: &RequestContext, conf: &'static PageServerConf, - ) -> anyhow::Result { + ) -> anyhow::Result { let file = VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx) .await @@ -417,7 +426,6 @@ impl ImageLayer { let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader.read_blk(0, ctx).await?; let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; - let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id); if summary.magic != IMAGE_FILE_MAGIC { anyhow::bail!("magic file mismatch"); } @@ -431,6 +439,7 @@ impl ImageLayer { let mut key_offset_stream = std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx)); + let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id); let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id); tokio::fs::create_dir_all(timeline_path).await?; @@ -451,7 +460,72 @@ impl ImageLayer { let content = cursor.read_blob(offset, ctx).await?; writer.put_image(key, content.into(), ctx).await?; } - Ok(writer.size()) + let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2; + Ok(path) + } + + async fn compare_are_equal( + path_a: &Utf8Path, + path_b: &Utf8Path, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut files = Vec::new(); + for path in [path_a, path_b] { + let file = VirtualFile::open_with_options( + path, + virtual_file::OpenOptions::new().read(true), + ctx, + ) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + files.push(file); + } + + let mut readers_summaries = Vec::new(); + for file in files.iter() { + let file_id = page_cache::next_file_id(); + let block_reader = FileBlockReader::new(&file, file_id); + let summary_blk = block_reader.read_blk(0, ctx).await?; + let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; + if summary.magic != IMAGE_FILE_MAGIC { + anyhow::bail!("magic file mismatch"); + } + readers_summaries.push((block_reader, summary)); + } + + let mut tree_readers_cursors = Vec::new(); + for (block_reader, summary) in readers_summaries.iter() { + let tree_reader = DiskBtreeReader::new( + summary.index_start_blk, + summary.index_root_blk, + block_reader, + ); + let cursor = block_reader.block_cursor(); + tree_readers_cursors.push((tree_reader, cursor)); + } + + let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0] + .0 + .get_stream_from(&[0u8; KEY_SIZE], ctx)); + let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[0] + .0 + .get_stream_from(&[0u8; KEY_SIZE], ctx)); + while let Some(r) = key_offset_stream_a.next().await { + let (key_a, offset_a): (Vec, _) = r?; + let Some(r) = key_offset_stream_b.next().await else { + panic!("second file at {path_b} has fewer keys than {path_a}"); + }; + let (key_b, offset_b): (Vec, _) = r?; + assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}"); + let key = Key::from_slice(&key_a); + let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?; + let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?; + assert_eq!( + content_a, content_b, + "mismatch for key={key} and {path_a}:{path_b}" + ); + } + Ok(()) } } @@ -886,11 +960,10 @@ impl ImageLayerWriterInner { /// /// Finish writing the image layer. /// - async fn finish( + async fn finish_inner( self, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -944,8 +1017,16 @@ impl ImageLayerWriterInner { // fsync the file file.sync_all().await?; + Ok((self.conf, desc, self.path)) + } + async fn finish( + self, + timeline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result { + let (conf, desc, path) = self.finish_inner(ctx).await?; // FIXME: why not carry the virtualfile here, it supports renaming? - let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; + let layer = Layer::finish_creating(conf, timeline, desc, &path)?; info!("created image layer {}", layer.local_path());