mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
Compress image layer
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2841,6 +2841,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3511,6 +3520,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11.1"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -40,7 +40,14 @@ use tracing::info;
|
||||
/// format, bump this!
|
||||
/// Note that TimelineMetadata uses its own version number to track
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Minimal sorage format version with compression support
|
||||
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Page image compression algorithm
|
||||
pub const NO_COMPRESSION: u8 = 0;
|
||||
pub const LZ4_COMPRESSION: u8 = 0;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
|
||||
|
||||
@@ -11,13 +11,16 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||
use lz4_flex;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_compressed_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = buf[off];
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == LZ4_COMPRESSION {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
|
||||
let len = srcbuf.len();
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let mut is_compressed = false;
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
io_buf.put_u8(len as u8);
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({} bytes)", len),
|
||||
));
|
||||
}
|
||||
if len == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||
if compressed.len() < len {
|
||||
io_buf.put_u8(LZ4_COMPRESSION);
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
io_buf.extend_from_slice(&compressed[..]);
|
||||
is_compressed = true;
|
||||
}
|
||||
if is_compressed {
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||
match hdr_res {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
self.io_buf = Some(io_buf);
|
||||
if is_compressed {
|
||||
hdr_res.map(|_| offset)
|
||||
} else {
|
||||
let (_buf, res) = self.write_all(srcbuf).await;
|
||||
res.map(|_| offset)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf).await
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
.await;
|
||||
self.io_buf = Some(io_buf);
|
||||
|
||||
@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
format_version: u16,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
f.debug_struct("ImageLayerInner")
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.field("format_version", &self.format_version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -408,6 +412,7 @@ impl ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
format_version: actual_summary.format_version,
|
||||
lsn,
|
||||
file,
|
||||
file_id,
|
||||
@@ -436,18 +441,20 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
block_reader
|
||||
.block_cursor()
|
||||
.read_compressed_blob(offset, &ctx)
|
||||
.await
|
||||
} else {
|
||||
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||
})
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = Bytes::from(blob);
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
let off = self.blob_writer.write_compressed_blob(img).await?;
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
Reference in New Issue
Block a user