Support lx4 WAL compression

This commit is contained in:
Konstantin Knizhnik
2023-11-24 10:46:06 +02:00
parent b09d686335
commit a880178cca
8 changed files with 46 additions and 3 deletions

10
Cargo.lock generated
View File

@@ -2841,6 +2841,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3511,6 +3520,7 @@ dependencies = [
"hyper",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -100,6 +100,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11.1"
md5 = "0.7.0"
memoffset = "0.8"
native-tls = "0.2"

View File

@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
}
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
dispatch_pgversion!(
version,
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
)
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,

View File

@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
}
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
false
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -471,8 +471,9 @@ impl WalIngest {
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.timeline.pg_version)? ||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.timeline.pg_version)?)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -480,7 +481,18 @@ impl WalIngest {
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, self.timeline.pg_version)? {
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
let decompressed = lz4_flex::block::decompress(
&decoded.record[img_offs..img_offs + img_len],
decompressed_img_len,
)
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
assert!(decompressed.len() == decompressed_img_len);
image.extend_from_slice(&decompressed);
} else {
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
}
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);