Compare commits

...

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
42d0b040f8 Fix merge conflicts 2024-03-11 17:34:05 +02:00
Konstantin Knizhnik
9832638c09 Add compression tag to BLOBs stored in image layer 2024-03-10 22:10:12 +02:00
Konstantin Knizhnik
62e7638c69 Store compression algorithm in image layer metadata 2024-03-10 21:54:47 +02:00
Konstantin Knizhnik
0dad8e427d Update pageserver/src/walingest.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-03-10 21:52:03 +02:00
Konstantin Knizhnik
4cfa2fdca5 Support compression of get_page responses 2024-03-10 21:52:01 +02:00
Konstantin Knizhnik
56ddf8e37f Build Postgres with lz4 support 2024-03-10 21:49:34 +02:00
Konstantin Knizhnik
ed4bb3073f Resolve merge conflict 2024-03-10 21:48:47 +02:00
Konstantin Knizhnik
b7e7aeed4d Peform compression of page images in storage 2024-03-10 21:48:00 +02:00
Konstantin Knizhnik
a880178cca Support lx4 WAL compression 2024-03-10 21:38:00 +02:00
23 changed files with 314 additions and 34 deletions

10
Cargo.lock generated
View File

@@ -2841,6 +2841,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3511,6 +3520,7 @@ dependencies = [
"hyper",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -100,6 +100,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11.1"
md5 = "0.7.0"
memoffset = "0.8"
native-tls = "0.2"

View File

@@ -23,12 +23,12 @@ endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
PG_CONFIGURE_OPTS += --with-libseccomp
PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp
else ifeq ($(UNAME_S),Darwin)
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage

View File

@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetCompressedPage(PagestreamGetPageResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
bytes.put(&resp.page[..]);
}
Self::GetCompressedPage(resp) => {
bytes.put_u8(105); /* tag from pagestore_client.h */
bytes.put_u16(resp.page.len() as u16);
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
@@ -1078,6 +1085,7 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetCompressedPage(_) => "GetCompressedPage",
}
}
}

View File

@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
}
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
dispatch_pgversion!(
version,
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
)
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,

View File

@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
}
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
false
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -157,6 +157,7 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetCompressedPage(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",

View File

@@ -40,7 +40,14 @@ use tracing::info;
/// format, bump this!
/// Note that TimelineMetadata uses its own version number to track
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const STORAGE_FORMAT_VERSION: u16 = 4;
/// Minimal sorage format version with compression support
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
/// Page image compression algorithm
pub const NO_COMPRESSION: u8 = 0;
pub const LZ4_COMPRESSION: u8 = 0;
pub const DEFAULT_PG_VERSION: u32 = 15;

View File

@@ -1155,9 +1155,18 @@ impl PageServerHandler {
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
let compressed = lz4_flex::block::compress(&page);
if compressed.len() < page.len() {
Ok(PagestreamBeMessage::GetCompressedPage(
PagestreamGetPageResponse {
page: Bytes::from(compressed),
},
))
} else {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}
}
#[instrument(skip_all, fields(shard_id))]

View File

@@ -16,6 +16,7 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use lz4_flex;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
let compressed = lz4_flex::block::compress(&img);
if compressed.len() < img.len() {
self.put(
rel_block_to_key(rel, blknum),
Value::CompressedImage(Bytes::from(compressed)),
);
} else {
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
}
Ok(())
}
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
if let Some((_, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else if let Value::CompressedImage(img) = value {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
Ok(Bytes::from(decompressed))
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates

View File

@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),
/// An compressed page image contains a full copy of the page
CompressedImage(Bytes),
/// A WalRecord value contains a WAL record that needs to be
/// replayed get the full value. Replaying the WAL record
/// might need a previous version of the value (if will_init()
@@ -22,12 +24,17 @@ pub enum Value {
impl Value {
pub fn is_image(&self) -> bool {
matches!(self, Value::Image(_))
match self {
Value::Image(_) => true,
Value::CompressedImage(_) => true,
Value::WalRecord(_) => false,
}
}
pub fn will_init(&self) -> bool {
match self {
Value::Image(_) => true,
Value::CompressedImage(_) => true,
Value::WalRecord(rec) => rec.will_init(),
}
}

View File

@@ -11,13 +11,16 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
use lz4_flex;
use postgres_ffi::BLCKSZ;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_compressed_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let blknum = (offset / PAGE_SZ as u64) as u32;
let off = (offset % PAGE_SZ as u64) as usize;
let buf = self.read_blk(blknum, ctx).await?;
let compression_alg = buf[off];
let res = self.read_blob(offset + 1, ctx).await?;
if compression_alg == LZ4_COMPRESSION {
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
})
} else {
assert_eq!(compression_alg, NO_COMPRESSION);
Ok(res)
}
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_blob_into_buf(
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(()))
}
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
let offset = self.offset;
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut is_compressed = false;
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(NO_COMPRESSION);
io_buf.put_u8(len as u8);
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", len),
));
}
if len == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(&srcbuf);
if compressed.len() < len {
io_buf.put_u8(LZ4_COMPRESSION);
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
io_buf.extend_from_slice(&compressed[..]);
is_compressed = true;
}
if is_compressed {
io_buf.put_u8(NO_COMPRESSION);
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
}
}
}
let (io_buf, hdr_res) = self.write_all(io_buf).await;
match hdr_res {
Ok(_) => (),
Err(e) => return Err(e),
}
self.io_buf = Some(io_buf);
if is_compressed {
hdr_res.map(|_| offset)
} else {
let (_buf, res) = self.write_all(srcbuf).await;
res.map(|_| offset)
}
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
}
self.write_all(io_buf).await
}
.await;
self.io_buf = Some(io_buf);

View File

@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use postgres_ffi::BLCKSZ;
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
let mut error: Option<PageReconstructError> = None;
let key_state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let situation = if let Ok(state) = key_state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
state.img = Some((lsn, img));
true
}
Value::CompressedImage(img) => {
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
Ok(decompressed) => {
state.img = Some((lsn, Bytes::from(decompressed)));
true
}
Err(e) => {
error = Some(PageReconstructError::from(anyhow::anyhow!(
"Failed to decompress blobrom virtual file: {}",
e
)));
true
}
}
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
state.situation
} else {
ValueReconstructSituation::Complete
};
if let Some(err) = error {
*key_state = Err(err);
}
situation
}
/// Returns the Lsn at which this key is cached if one exists.

View File

@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
need_image = false;
break;
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::CompressedImage(img) => {
format!(" compressed img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
@@ -1138,6 +1148,11 @@ impl DeltaLayerInner {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
let checkpoint = CheckPoint::decode(&decompressed)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::WalRecord(_rec) => {
println!(" unexpected walrecord value for checkpoint key");
}

View File

@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
format_version: u16,
lsn: Lsn,
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
f.debug_struct("ImageLayerInner")
.field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk)
.field("format_version", &self.format_version)
.finish()
}
}
@@ -408,6 +412,7 @@ impl ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
format_version: actual_summary.format_version,
lsn,
file,
file_id,
@@ -436,18 +441,20 @@ impl ImageLayerInner {
)
.await?
{
let blob = block_reader
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
block_reader
.block_cursor()
.read_compressed_blob(offset, &ctx)
.await
} else {
block_reader.block_cursor().read_blob(offset, &ctx).await
})
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
let off = self.blob_writer.write_compressed_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;

View File

@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::walrecord;
use anyhow::{anyhow, ensure, Result};
use bytes::Bytes;
use lz4_flex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use tracing::*;
@@ -133,6 +136,9 @@ impl InMemoryLayer {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::CompressedImage(img)) => {
write!(&mut desc, " compressed img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
@@ -184,6 +190,11 @@ impl InMemoryLayer {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));

View File

@@ -471,8 +471,9 @@ impl WalIngest {
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -480,7 +481,21 @@ impl WalIngest {
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if postgres_ffi::bkpimage_is_compressed_lz4(
blk.bimg_info,
modification.tline.pg_version,
)? {
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
let decompressed = lz4_flex::block::decompress(
&decoded.record[img_offs..img_offs + img_len],
decompressed_img_len,
)
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
assert_eq!(decompressed.len(), decompressed_img_len);
image.extend_from_slice(&decompressed);
} else {
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
}
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);

View File

@@ -18,7 +18,7 @@ OBJS = \
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
SHLIB_LINK = -lcurl -llz4
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql

View File

@@ -44,6 +44,7 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
T_NeonGetCompressedPageResponse
} NeonMessageTag;
/* base struct for c-style inheritance */
@@ -144,6 +145,15 @@ typedef struct
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
{
NeonMessageTag tag;
uint16 compressed_size;
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetCompressedPageResponse;
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
typedef struct
{
NeonMessageTag tag;

View File

@@ -45,6 +45,10 @@
*/
#include "postgres.h"
#ifdef USE_LZ4
#include <lz4.h>
#endif
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonGetCompressedPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
Assert(msg_resp->tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp;
uint16 compressed_size = pq_getmsgint(s, 2);
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
msg_resp->tag = tag;
msg_resp->compressed_size = compressed_size;
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
pq_getmsgend(s);
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonErrorResponse:
{
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_write(rinfo, forkNum, blkno, buffer);
break;
case T_NeonGetCompressedPageResponse:
{
#ifndef USE_LZ4
ereport(ERROR, \
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
errmsg("compression method lz4 not supported"), \
errdetail("This functionality requires the server to be built with lz4 support."), \
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
#else
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
int rc = LZ4_decompress_safe(cp->page,
buffer,
cp->compressed_size,
BLCKSZ);
if (rc != BLCKSZ) {
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg_internal("compressed lz4 data is corrupt")));
}
lfc_write(rinfo, forkNum, blkno, buffer);
#endif
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),