mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Also measure decompression time
This commit is contained in:
@@ -344,7 +344,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
tmp_dir: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64)>, anyhow::Error>
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -371,7 +372,7 @@ impl ImageLayer {
|
||||
dest_repo_path: &Utf8Path,
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64)>> {
|
||||
) -> anyhow::Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>> {
|
||||
fn make_conf(
|
||||
image_compression: Option<ImageCompressionAlgorithm>,
|
||||
dest_repo_path: &Utf8Path,
|
||||
@@ -387,27 +388,35 @@ impl ImageLayer {
|
||||
Some(ImageCompressionAlgorithm::ZstdHigh),
|
||||
Some(ImageCompressionAlgorithm::LZ4),
|
||||
];
|
||||
let confs = image_compressions
|
||||
.clone()
|
||||
.map(|compression| make_conf(compression, dest_repo_path));
|
||||
let mut stats = Vec::new();
|
||||
for image_compression in image_compressions {
|
||||
let start = Instant::now();
|
||||
let size = Self::compressed_size_for_conf(
|
||||
path,
|
||||
ctx,
|
||||
make_conf(image_compression, dest_repo_path),
|
||||
)
|
||||
.await?;
|
||||
let elapsed_ms = start.elapsed().as_millis() as u64;
|
||||
stats.push((image_compression, size, elapsed_ms));
|
||||
for (image_compression, conf) in image_compressions.into_iter().zip(confs) {
|
||||
let start_compression = Instant::now();
|
||||
let compressed_path = Self::compress_for_conf(path, ctx, conf).await?;
|
||||
let size = path.metadata()?.size();
|
||||
let elapsed_ms = start_compression.elapsed().as_millis() as u64;
|
||||
let start_decompression = Instant::now();
|
||||
Self::compare_are_equal(path, &compressed_path, ctx).await?;
|
||||
let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64;
|
||||
std::fs::remove_file(compressed_path)?;
|
||||
stats.push((
|
||||
image_compression,
|
||||
size,
|
||||
elapsed_ms,
|
||||
elapsed_decompression_ms,
|
||||
));
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn compressed_size_for_conf(
|
||||
async fn compress_for_conf(
|
||||
path: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<u64> {
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let file =
|
||||
VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx)
|
||||
.await
|
||||
@@ -417,7 +426,6 @@ impl ImageLayer {
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
@@ -431,6 +439,7 @@ impl ImageLayer {
|
||||
let mut key_offset_stream =
|
||||
std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id);
|
||||
tokio::fs::create_dir_all(timeline_path).await?;
|
||||
|
||||
@@ -451,7 +460,72 @@ impl ImageLayer {
|
||||
let content = cursor.read_blob(offset, ctx).await?;
|
||||
writer.put_image(key, content.into(), ctx).await?;
|
||||
}
|
||||
Ok(writer.size())
|
||||
let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn compare_are_equal(
|
||||
path_a: &Utf8Path,
|
||||
path_b: &Utf8Path,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut files = Vec::new();
|
||||
for path in [path_a, path_b] {
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
let mut readers_summaries = Vec::new();
|
||||
for file in files.iter() {
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
if summary.magic != IMAGE_FILE_MAGIC {
|
||||
anyhow::bail!("magic file mismatch");
|
||||
}
|
||||
readers_summaries.push((block_reader, summary));
|
||||
}
|
||||
|
||||
let mut tree_readers_cursors = Vec::new();
|
||||
for (block_reader, summary) in readers_summaries.iter() {
|
||||
let tree_reader = DiskBtreeReader::new(
|
||||
summary.index_start_blk,
|
||||
summary.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
let cursor = block_reader.block_cursor();
|
||||
tree_readers_cursors.push((tree_reader, cursor));
|
||||
}
|
||||
|
||||
let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[0]
|
||||
.0
|
||||
.get_stream_from(&[0u8; KEY_SIZE], ctx));
|
||||
while let Some(r) = key_offset_stream_a.next().await {
|
||||
let (key_a, offset_a): (Vec<u8>, _) = r?;
|
||||
let Some(r) = key_offset_stream_b.next().await else {
|
||||
panic!("second file at {path_b} has fewer keys than {path_a}");
|
||||
};
|
||||
let (key_b, offset_b): (Vec<u8>, _) = r?;
|
||||
assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}");
|
||||
let key = Key::from_slice(&key_a);
|
||||
let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?;
|
||||
let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?;
|
||||
assert_eq!(
|
||||
content_a, content_b,
|
||||
"mismatch for key={key} and {path_a}:{path_b}"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -886,11 +960,10 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish(
|
||||
async fn finish_inner(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -944,8 +1017,16 @@ impl ImageLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
Ok((self.conf, desc, self.path))
|
||||
}
|
||||
async fn finish(
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let (conf, desc, path) = self.finish_inner(ctx).await?;
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
|
||||
|
||||
info!("created image layer {}", layer.local_path());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user