Compare commits

...

3 Commits

Author SHA1 Message Date
John Spray
e7a932c85c pageserver: always enable compression-aware reads 2024-07-18 09:51:09 +00:00
John Spray
231e21ecc5 use image compression for image values in delta layers 2024-07-18 09:48:40 +00:00
John Spray
ad768cb848 tests: enable compression 2024-07-18 09:45:28 +00:00
4 changed files with 26 additions and 32 deletions

View File

@@ -19,7 +19,6 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
@@ -73,22 +72,14 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
let bit_mask = if self.read_compressed {
!LEN_COMPRESSION_BIT_MASK
} else {
0x7f
};
len_buf[0] &= bit_mask;
len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
if compression_bits > BYTE_UNCOMPRESSED {
warn!("reading key above future limit ({len} bytes)");
}
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD {
@@ -331,6 +322,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())

View File

@@ -149,24 +149,19 @@ impl<'a> BlockReaderRef<'a> {
/// ```
///
pub struct BlockCursor<'a> {
pub(super) read_compressed: bool,
reader: BlockReaderRef<'a>,
}
impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
Self::new_with_compression(reader, false)
Self::new_with_compression(reader, true)
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
BlockCursor {
read_compressed,
reader,
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, _read_compressed: bool) -> Self {
BlockCursor { reader }
}
// Needed by cli
pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
read_compressed: false,
reader: BlockReaderRef::FileBlockReader(reader),
}
}
@@ -196,17 +191,11 @@ pub struct FileBlockReader<'a> {
/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,
compressed_reads: bool,
}
impl<'a> FileBlockReader<'a> {
pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
FileBlockReader {
file_id,
file,
compressed_reads: true,
}
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
@@ -253,10 +242,7 @@ impl<'a> FileBlockReader<'a> {
impl BlockReader for FileBlockReader<'_> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new_with_compression(
BlockReaderRef::FileBlockReader(self),
self.compressed_reads,
)
BlockCursor::new_with_compression(BlockReaderRef::FileBlockReader(self), true)
}
}

View File

@@ -457,8 +457,15 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let compression = if val.len() >= 8192 {
// For full page images, respect configured image compression algorithm.
self.conf.image_compression
} else {
// For small writes, do not use compression. Compression ratios on tiny buffers do not justify CPU cost.
ImageCompressionAlgorithm::Disabled
};
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
@@ -479,6 +486,10 @@ impl DeltaLayerWriterInner {
self.blob_writer.size() + self.tree.borrow_writer().size()
}
fn size_values(&self) -> u64 {
self.blob_writer.size()
}
///
/// Finish writing the delta layer.
///
@@ -668,6 +679,10 @@ impl DeltaLayerWriter {
self.inner.as_ref().unwrap().size()
}
pub fn size_values(&self) -> u64 {
self.inner.as_ref().unwrap().size_values()
}
///
/// Finish writing the delta layer.
///

View File

@@ -1135,6 +1135,7 @@ class NeonEnv:
"listen_http_addr": f"localhost:{pageserver_port.http}",
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
"image_compression": "zstd",
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine