mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
Add compression tag to BLOBs stored in image layer
This commit is contained in:
@@ -577,7 +577,3 @@ mod tests {
|
||||
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_rel_data_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
@@ -18,6 +18,9 @@ use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||
use lz4_flex;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_compressed_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = buf[off];
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == LZ4_COMPRESSION {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
@@ -211,6 +237,41 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_compressed_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
if srcbuf.len() < 128 {
|
||||
self.write_all(&[NO_COMPRESSION]).await?;
|
||||
// Short blob. Write a 1-byte length header
|
||||
let len_buf = srcbuf.len() as u8;
|
||||
self.write_all(&[len_buf]).await?;
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if srcbuf.len() == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(srcbuf);
|
||||
if compressed.len() < srcbuf.len() {
|
||||
self.write_all(&[LZ4_COMPRESSION]).await?;
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
self.write_all(&len_buf).await?;
|
||||
self.write_all(&compressed).await?;
|
||||
return Ok(offset);
|
||||
}
|
||||
}
|
||||
if srcbuf.len() > 0x7fff_ffff {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({} bytes)", srcbuf.len()),
|
||||
));
|
||||
}
|
||||
self.write_all(&[NO_COMPRESSION]).await?;
|
||||
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
self.write_all(&len_buf).await?;
|
||||
}
|
||||
self.write_all(srcbuf).await?;
|
||||
Ok(offset)
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
|
||||
@@ -37,6 +37,10 @@ use crate::tenant::storage_layer::{
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
@@ -50,7 +54,6 @@ use lz4_flex;
|
||||
use pageserver_api::key::is_rel_data_key;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -92,10 +95,6 @@ pub struct Summary {
|
||||
pub index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
pub index_root_blk: u32,
|
||||
/// Compression algorithm used for relation pages. Compressed size should be always
|
||||
/// smaller than original size, otherwise original image is stored instead of conpressed image.
|
||||
/// Old storage versions (format_version < 4) do not have this fields and it is deseriealized as 0=NO+COMPRESSION
|
||||
pub compression_alg: u8,
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
@@ -127,7 +126,6 @@ impl Summary {
|
||||
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
compression_alg: LZ4_COMPRESSION,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -163,7 +161,6 @@ pub struct ImageLayerInner {
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
format_version: u16,
|
||||
compression_alg: u8,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -179,7 +176,6 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.field("format_version", &self.format_version)
|
||||
.field("compression_alg", &self.compression_alg)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -404,20 +400,10 @@ impl ImageLayerInner {
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
|
||||
if actual_summary.compression_alg != LZ4_COMPRESSION
|
||||
&& actual_summary.compression_alg != NO_COMPRESSION
|
||||
{
|
||||
bail!(
|
||||
"Unsupported compression algorithm: {}",
|
||||
actual_summary.compression_alg
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
expected_summary.compression_alg = actual_summary.compression_alg;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
@@ -432,7 +418,6 @@ impl ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
format_version: actual_summary.format_version,
|
||||
compression_alg: actual_summary.compression_alg,
|
||||
lsn,
|
||||
file,
|
||||
file_id,
|
||||
@@ -461,26 +446,17 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = if self.compression_alg == LZ4_COMPRESSION
|
||||
&& is_rel_data_key(key)
|
||||
&& blob.len() < BLCKSZ as usize
|
||||
{
|
||||
let decompressed = lz4_flex::block::decompress(&blob, BLCKSZ as usize)?;
|
||||
Bytes::from(decompressed)
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
file.block_cursor().read_compressed_blob(offset, &ctx).await
|
||||
} else {
|
||||
Bytes::from(blob)
|
||||
};
|
||||
file.block_cursor().read_blob(offset, &ctx).await
|
||||
})
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = Bytes::from(blob);
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -691,17 +667,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = if is_rel_data_key(key) {
|
||||
assert_eq!(img.len(), BLCKSZ as usize);
|
||||
let compressed = lz4_flex::block::compress(img);
|
||||
if compressed.len() < img.len() {
|
||||
self.blob_writer.write_blob(&compressed).await;
|
||||
} else {
|
||||
self.blob_writer.write_blob(img).await;
|
||||
}
|
||||
} else {
|
||||
self.blob_writer.write_blob(img).await;
|
||||
};
|
||||
let (_img, res) = self.blob_writer.write_compressed_blob(img).await?;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
@@ -739,7 +705,6 @@ impl ImageLayerWriterInner {
|
||||
lsn: self.lsn,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
compression_alg: LZ4_COMPRESSION,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
|
||||
Reference in New Issue
Block a user