mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-28 15:50:38 +00:00
Compare commits
9 Commits
fix_aio_pr
...
wal_lz4_co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42d0b040f8 | ||
|
|
9832638c09 | ||
|
|
62e7638c69 | ||
|
|
0dad8e427d | ||
|
|
4cfa2fdca5 | ||
|
|
56ddf8e37f | ||
|
|
ed4bb3073f | ||
|
|
b7e7aeed4d | ||
|
|
a880178cca |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2841,6 +2841,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3511,6 +3520,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11.1"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
|
||||
4
Makefile
4
Makefile
@@ -23,12 +23,12 @@ endif
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
PG_CONFIGURE_OPTS += --with-libseccomp
|
||||
PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp
|
||||
else ifeq ($(UNAME_S),Darwin)
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage
|
||||
|
||||
@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetCompressedPage(PagestreamGetPageResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
Self::GetCompressedPage(resp) => {
|
||||
bytes.put_u8(105); /* tag from pagestore_client.h */
|
||||
bytes.put_u16(resp.page.len() as u16);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
@@ -1078,6 +1085,7 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetCompressedPage(_) => "GetCompressedPage",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
|
||||
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
|
||||
}
|
||||
|
||||
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
|
||||
dispatch_pgversion!(
|
||||
version,
|
||||
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
|
||||
)
|
||||
}
|
||||
|
||||
pub fn generate_wal_segment(
|
||||
segno: u64,
|
||||
system_id: u64,
|
||||
|
||||
@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
|
||||
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
|
||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||
}
|
||||
|
||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
|
||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -157,6 +157,7 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetCompressedPage(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
|
||||
@@ -40,7 +40,14 @@ use tracing::info;
|
||||
/// format, bump this!
|
||||
/// Note that TimelineMetadata uses its own version number to track
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Minimal sorage format version with compression support
|
||||
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Page image compression algorithm
|
||||
pub const NO_COMPRESSION: u8 = 0;
|
||||
pub const LZ4_COMPRESSION: u8 = 0;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
|
||||
|
||||
@@ -1155,9 +1155,18 @@ impl PageServerHandler {
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
let compressed = lz4_flex::block::compress(&page);
|
||||
if compressed.len() < page.len() {
|
||||
Ok(PagestreamBeMessage::GetCompressedPage(
|
||||
PagestreamGetPageResponse {
|
||||
page: Bytes::from(compressed),
|
||||
},
|
||||
))
|
||||
} else {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
|
||||
@@ -16,6 +16,7 @@ use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use lz4_flex;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
let compressed = lz4_flex::block::compress(&img);
|
||||
if compressed.len() < img.len() {
|
||||
self.put(
|
||||
rel_block_to_key(rel, blknum),
|
||||
Value::CompressedImage(Bytes::from(compressed)),
|
||||
);
|
||||
} else {
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
|
||||
if let Some((_, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else if let Value::CompressedImage(img) = value {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
|
||||
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||
Ok(Bytes::from(decompressed))
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
|
||||
@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||
pub enum Value {
|
||||
/// An Image value contains a full copy of the value
|
||||
Image(Bytes),
|
||||
/// An compressed page image contains a full copy of the page
|
||||
CompressedImage(Bytes),
|
||||
/// A WalRecord value contains a WAL record that needs to be
|
||||
/// replayed get the full value. Replaying the WAL record
|
||||
/// might need a previous version of the value (if will_init()
|
||||
@@ -22,12 +24,17 @@ pub enum Value {
|
||||
|
||||
impl Value {
|
||||
pub fn is_image(&self) -> bool {
|
||||
matches!(self, Value::Image(_))
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn will_init(&self) -> bool {
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(rec) => rec.will_init(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,13 +11,16 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||
use lz4_flex;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_compressed_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = buf[off];
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == LZ4_COMPRESSION {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
|
||||
let len = srcbuf.len();
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let mut is_compressed = false;
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
io_buf.put_u8(len as u8);
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({} bytes)", len),
|
||||
));
|
||||
}
|
||||
if len == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||
if compressed.len() < len {
|
||||
io_buf.put_u8(LZ4_COMPRESSION);
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
io_buf.extend_from_slice(&compressed[..]);
|
||||
is_compressed = true;
|
||||
}
|
||||
if is_compressed {
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||
match hdr_res {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
self.io_buf = Some(io_buf);
|
||||
if is_compressed {
|
||||
hdr_res.map(|_| offset)
|
||||
} else {
|
||||
let (_buf, res) = self.write_all(srcbuf).await;
|
||||
res.map(|_| offset)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf).await
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
.await;
|
||||
self.io_buf = Some(io_buf);
|
||||
|
||||
@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::models::{
|
||||
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
|
||||
lsn: Lsn,
|
||||
value: Value,
|
||||
) -> ValueReconstructSituation {
|
||||
let state = self
|
||||
let mut error: Option<PageReconstructError> = None;
|
||||
let key_state = self
|
||||
.keys
|
||||
.entry(*key)
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
|
||||
if let Ok(state) = state {
|
||||
let situation = if let Ok(state) = key_state {
|
||||
let key_done = match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match value {
|
||||
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
|
||||
state.img = Some((lsn, img));
|
||||
true
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
|
||||
Ok(decompressed) => {
|
||||
state.img = Some((lsn, Bytes::from(decompressed)));
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
error = Some(PageReconstructError::from(anyhow::anyhow!(
|
||||
"Failed to decompress blobrom virtual file: {}",
|
||||
e
|
||||
)));
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let reached_cache =
|
||||
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
||||
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
|
||||
state.situation
|
||||
} else {
|
||||
ValueReconstructSituation::Complete
|
||||
};
|
||||
if let Some(err) = error {
|
||||
*key_state = Err(err);
|
||||
}
|
||||
situation
|
||||
}
|
||||
|
||||
/// Returns the Lsn at which this key is cached if one exists.
|
||||
|
||||
@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
format!(" compressed img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
@@ -1138,6 +1148,11 @@ impl DeltaLayerInner {
|
||||
let checkpoint = CheckPoint::decode(&img)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
let checkpoint = CheckPoint::decode(&decompressed)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::WalRecord(_rec) => {
|
||||
println!(" unexpected walrecord value for checkpoint key");
|
||||
}
|
||||
|
||||
@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
format_version: u16,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
f.debug_struct("ImageLayerInner")
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.field("format_version", &self.format_version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -408,6 +412,7 @@ impl ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
format_version: actual_summary.format_version,
|
||||
lsn,
|
||||
file,
|
||||
file_id,
|
||||
@@ -436,18 +441,20 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
block_reader
|
||||
.block_cursor()
|
||||
.read_compressed_blob(offset, &ctx)
|
||||
.await
|
||||
} else {
|
||||
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||
})
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = Bytes::from(blob);
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
let off = self.blob_writer.write_compressed_blob(img).await?;
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::walrecord;
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use lz4_flex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
@@ -133,6 +136,9 @@ impl InMemoryLayer {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::CompressedImage(img)) => {
|
||||
write!(&mut desc, " compressed img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::WalRecord(rec)) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||
write!(
|
||||
@@ -184,6 +190,11 @@ impl InMemoryLayer {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
|
||||
@@ -471,8 +471,9 @@ impl WalIngest {
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
|
||||
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
|
||||
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
|
||||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
@@ -480,7 +481,21 @@ impl WalIngest {
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
if postgres_ffi::bkpimage_is_compressed_lz4(
|
||||
blk.bimg_info,
|
||||
modification.tline.pg_version,
|
||||
)? {
|
||||
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
|
||||
let decompressed = lz4_flex::block::decompress(
|
||||
&decoded.record[img_offs..img_offs + img_len],
|
||||
decompressed_img_len,
|
||||
)
|
||||
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||
assert_eq!(decompressed.len(), decompressed_img_len);
|
||||
image.extend_from_slice(&decompressed);
|
||||
} else {
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
}
|
||||
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
|
||||
@@ -18,7 +18,7 @@ OBJS = \
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
SHLIB_LINK = -lcurl -llz4
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
||||
|
||||
@@ -44,6 +44,7 @@ typedef enum
|
||||
T_NeonErrorResponse,
|
||||
T_NeonDbSizeResponse,
|
||||
T_NeonGetSlruSegmentResponse,
|
||||
T_NeonGetCompressedPageResponse
|
||||
} NeonMessageTag;
|
||||
|
||||
/* base struct for c-style inheritance */
|
||||
@@ -144,6 +145,15 @@ typedef struct
|
||||
|
||||
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
uint16 compressed_size;
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetCompressedPageResponse;
|
||||
|
||||
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
|
||||
@@ -45,6 +45,10 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef USE_LZ4
|
||||
#include <lz4.h>
|
||||
#endif
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
case T_NeonExistsResponse:
|
||||
case T_NeonNblocksResponse:
|
||||
case T_NeonGetPageResponse:
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
case T_NeonErrorResponse:
|
||||
case T_NeonDbSizeResponse:
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
NeonGetCompressedPageResponse *msg_resp;
|
||||
uint16 compressed_size = pq_getmsgint(s, 2);
|
||||
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
|
||||
msg_resp->tag = tag;
|
||||
msg_resp->compressed_size = compressed_size;
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
|
||||
pq_getmsgend(s);
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
|
||||
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
{
|
||||
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
|
||||
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
break;
|
||||
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
#ifndef USE_LZ4
|
||||
ereport(ERROR, \
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
|
||||
errmsg("compression method lz4 not supported"), \
|
||||
errdetail("This functionality requires the server to be built with lz4 support."), \
|
||||
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
|
||||
#else
|
||||
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
|
||||
int rc = LZ4_decompress_safe(cp->page,
|
||||
buffer,
|
||||
cp->compressed_size,
|
||||
BLCKSZ);
|
||||
if (rc != BLCKSZ) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg_internal("compressed lz4 data is corrupt")));
|
||||
}
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
|
||||
Reference in New Issue
Block a user