From a880178ccabc90053478b1eb0366620d9e37d4dc Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 24 Nov 2023 10:46:06 +0200 Subject: [PATCH] Support lx4 WAL compression --- Cargo.lock | 10 ++++++++++ Cargo.toml | 1 + libs/postgres_ffi/src/lib.rs | 7 +++++++ libs/postgres_ffi/src/pg_constants_v14.rs | 4 ++++ libs/postgres_ffi/src/pg_constants_v15.rs | 4 ++++ libs/postgres_ffi/src/pg_constants_v16.rs | 4 ++++ pageserver/Cargo.toml | 1 + pageserver/src/walingest.rs | 18 +++++++++++++++--- 8 files changed, 46 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fd9053f62..db206dd0c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2841,6 +2841,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" +dependencies = [ + "twox-hash", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3511,6 +3520,7 @@ dependencies = [ "hyper", "itertools", "leaky-bucket", + "lz4_flex", "md5", "metrics", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 76f4ff041c..04178987e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ jsonwebtoken = "9" lasso = "0.7" leaky-bucket = "1.0.1" libc = "0.2" +lz4_flex = "0.11.1" md5 = "0.7.0" memoffset = "0.8" native-tls = "0.2" diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index aa6845b9b1..98599dc059 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result anyhow::Result { + dispatch_pgversion!( + version, + Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info)) + ) +} + pub fn generate_wal_segment( segno: u64, system_id: u64, diff --git a/libs/postgres_ffi/src/pg_constants_v14.rs b/libs/postgres_ffi/src/pg_constants_v14.rs index 32f8f51114..4edbfdc4d6 100644 --- a/libs/postgres_ffi/src/pg_constants_v14.rs +++ b/libs/postgres_ffi/src/pg_constants_v14.rs @@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c * pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & BKPIMAGE_IS_COMPRESSED) != 0 } + +pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool { + false +} diff --git a/libs/postgres_ffi/src/pg_constants_v15.rs b/libs/postgres_ffi/src/pg_constants_v15.rs index 626a23c7ea..b419dc5241 100644 --- a/libs/postgres_ffi/src/pg_constants_v15.rs +++ b/libs/postgres_ffi/src/pg_constants_v15.rs @@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & ANY_COMPRESS_FLAG) != 0 } + +pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool { + (bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0 +} diff --git a/libs/postgres_ffi/src/pg_constants_v16.rs b/libs/postgres_ffi/src/pg_constants_v16.rs index 587be71cb3..52750f8ac5 100644 --- a/libs/postgres_ffi/src/pg_constants_v16.rs +++ b/libs/postgres_ffi/src/pg_constants_v16.rs @@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool { (bimg_info & ANY_COMPRESS_FLAG) != 0 } + +pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool { + (bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0 +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5adeaffe1a..a01a50dc52 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true leaky-bucket.workspace = true +lz4_flex.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 63a2b30d09..43b8812c4b 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -471,8 +471,9 @@ impl WalIngest { && decoded.xl_rmid == pg_constants::RM_XLOG_ID && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? + // only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record + && (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.timeline.pg_version)? || + postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.timeline.pg_version)?) // do not materialize null pages because them most likely be soon replaced with real data && blk.bimg_len != 0 { @@ -480,7 +481,18 @@ impl WalIngest { let img_len = blk.bimg_len as usize; let img_offs = blk.bimg_offset as usize; let mut image = BytesMut::with_capacity(BLCKSZ as usize); - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + if postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, self.timeline.pg_version)? { + let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize; + let decompressed = lz4_flex::block::decompress( + &decoded.record[img_offs..img_offs + img_len], + decompressed_img_len, + ) + .map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?; + assert!(decompressed.len() == decompressed_img_len); + image.extend_from_slice(&decompressed); + } else { + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + } if blk.hole_length != 0 { let tail = image.split_off(blk.hole_offset as usize);