From 40e79712ebb66d9a9e2aa3acafb4cb88ce78d8ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 12 Jun 2024 16:04:39 +0200 Subject: [PATCH] Add decompression --- pageserver/src/tenant/blob_io.rs | 45 +++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 8ace666503..a0328951f3 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -14,6 +14,7 @@ use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; +use postgres_ffi::BLCKSZ; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; @@ -69,12 +70,29 @@ impl<'a> BlockCursor<'a> { len_buf.copy_from_slice(&buf[off..off + 4]); off += 4; } - len_buf[0] &= 0x7f; + len_buf[0] &= 0x0f; u32::from_be_bytes(len_buf) as usize }; + let compression_bits = first_len_byte & 0xf0; - dstbuf.clear(); - dstbuf.reserve(len); + let mut tmp_buf = Vec::new(); + let buf_to_write; + let compression = if compression_bits <= BYTE_UNCOMPRESSED { + buf_to_write = dstbuf; + None + } else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 { + buf_to_write = &mut tmp_buf; + Some(dstbuf) + } else { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid compression byte {compression_bits:x}"), + ); + return Err(error); + }; + + buf_to_write.clear(); + buf_to_write.reserve(len); // Read the payload let mut remain = len; @@ -88,10 +106,29 @@ impl<'a> BlockCursor<'a> { page_remain = PAGE_SZ; } let this_blk_len = min(remain, page_remain); - dstbuf.extend_from_slice(&buf[off..off + this_blk_len]); + buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]); remain -= this_blk_len; off += this_blk_len; } + + if let Some(dstbuf) = compression { + if compression_bits == BYTE_ZSTD { + let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf); + decoder.write_all(buf_to_write).await?; + } else if compression_bits == BYTE_LZ4 { + let decompressed = lz4_flex::block::decompress(&buf_to_write, 128 as usize) + .map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "lz4 decompression error", + ) + })?; + dstbuf.extend_from_slice(&decompressed); + } else { + unreachable!("already checked above") + } + } + Ok(()) } }