mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 17:00:37 +00:00
Compare commits
9 Commits
conrad/pro
...
wal_lz4_co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42d0b040f8 | ||
|
|
9832638c09 | ||
|
|
62e7638c69 | ||
|
|
0dad8e427d | ||
|
|
4cfa2fdca5 | ||
|
|
56ddf8e37f | ||
|
|
ed4bb3073f | ||
|
|
b7e7aeed4d | ||
|
|
a880178cca |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2841,6 +2841,15 @@ version = "0.4.20"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lz4_flex"
|
||||||
|
version = "0.11.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||||
|
dependencies = [
|
||||||
|
"twox-hash",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "match_cfg"
|
name = "match_cfg"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -3511,6 +3520,7 @@ dependencies = [
|
|||||||
"hyper",
|
"hyper",
|
||||||
"itertools",
|
"itertools",
|
||||||
"leaky-bucket",
|
"leaky-bucket",
|
||||||
|
"lz4_flex",
|
||||||
"md5",
|
"md5",
|
||||||
"metrics",
|
"metrics",
|
||||||
"nix 0.27.1",
|
"nix 0.27.1",
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
|||||||
lasso = "0.7"
|
lasso = "0.7"
|
||||||
leaky-bucket = "1.0.1"
|
leaky-bucket = "1.0.1"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
|
lz4_flex = "0.11.1"
|
||||||
md5 = "0.7.0"
|
md5 = "0.7.0"
|
||||||
memoffset = "0.8"
|
memoffset = "0.8"
|
||||||
native-tls = "0.2"
|
native-tls = "0.2"
|
||||||
|
|||||||
4
Makefile
4
Makefile
@@ -23,12 +23,12 @@ endif
|
|||||||
UNAME_S := $(shell uname -s)
|
UNAME_S := $(shell uname -s)
|
||||||
ifeq ($(UNAME_S),Linux)
|
ifeq ($(UNAME_S),Linux)
|
||||||
# Seccomp BPF is only available for Linux
|
# Seccomp BPF is only available for Linux
|
||||||
PG_CONFIGURE_OPTS += --with-libseccomp
|
PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp
|
||||||
else ifeq ($(UNAME_S),Darwin)
|
else ifeq ($(UNAME_S),Darwin)
|
||||||
# macOS with brew-installed openssl requires explicit paths
|
# macOS with brew-installed openssl requires explicit paths
|
||||||
# It can be configured with OPENSSL_PREFIX variable
|
# It can be configured with OPENSSL_PREFIX variable
|
||||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||||
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage
|
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage
|
||||||
|
|||||||
@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
|
|||||||
Error(PagestreamErrorResponse),
|
Error(PagestreamErrorResponse),
|
||||||
DbSize(PagestreamDbSizeResponse),
|
DbSize(PagestreamDbSizeResponse),
|
||||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||||
|
GetCompressedPage(PagestreamGetPageResponse),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep in sync with `pagestore_client.h`
|
// Keep in sync with `pagestore_client.h`
|
||||||
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
|
|||||||
bytes.put(&resp.page[..]);
|
bytes.put(&resp.page[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Self::GetCompressedPage(resp) => {
|
||||||
|
bytes.put_u8(105); /* tag from pagestore_client.h */
|
||||||
|
bytes.put_u16(resp.page.len() as u16);
|
||||||
|
bytes.put(&resp.page[..]);
|
||||||
|
}
|
||||||
|
|
||||||
Self::Error(resp) => {
|
Self::Error(resp) => {
|
||||||
bytes.put_u8(Tag::Error as u8);
|
bytes.put_u8(Tag::Error as u8);
|
||||||
bytes.put(resp.message.as_bytes());
|
bytes.put(resp.message.as_bytes());
|
||||||
@@ -1078,6 +1085,7 @@ impl PagestreamBeMessage {
|
|||||||
Self::Error(_) => "Error",
|
Self::Error(_) => "Error",
|
||||||
Self::DbSize(_) => "DbSize",
|
Self::DbSize(_) => "DbSize",
|
||||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||||
|
Self::GetCompressedPage(_) => "GetCompressedPage",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
|
|||||||
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
|
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
|
||||||
|
dispatch_pgversion!(
|
||||||
|
version,
|
||||||
|
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn generate_wal_segment(
|
pub fn generate_wal_segment(
|
||||||
segno: u64,
|
segno: u64,
|
||||||
system_id: u64,
|
system_id: u64,
|
||||||
|
|||||||
@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
|
|||||||
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||||
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
|
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
|||||||
|
|
||||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||||
|
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
|||||||
|
|
||||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||||
|
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||||
|
}
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
|||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
itertools.workspace = true
|
itertools.workspace = true
|
||||||
leaky-bucket.workspace = true
|
leaky-bucket.workspace = true
|
||||||
|
lz4_flex.workspace = true
|
||||||
md5.workspace = true
|
md5.workspace = true
|
||||||
nix.workspace = true
|
nix.workspace = true
|
||||||
# hack to get the number of worker threads tokio uses
|
# hack to get the number of worker threads tokio uses
|
||||||
|
|||||||
@@ -157,6 +157,7 @@ impl PagestreamClient {
|
|||||||
PagestreamBeMessage::Exists(_)
|
PagestreamBeMessage::Exists(_)
|
||||||
| PagestreamBeMessage::Nblocks(_)
|
| PagestreamBeMessage::Nblocks(_)
|
||||||
| PagestreamBeMessage::DbSize(_)
|
| PagestreamBeMessage::DbSize(_)
|
||||||
|
| PagestreamBeMessage::GetCompressedPage(_)
|
||||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||||
anyhow::bail!(
|
anyhow::bail!(
|
||||||
"unexpected be message kind in response to getpage request: {}",
|
"unexpected be message kind in response to getpage request: {}",
|
||||||
|
|||||||
@@ -40,7 +40,14 @@ use tracing::info;
|
|||||||
/// format, bump this!
|
/// format, bump this!
|
||||||
/// Note that TimelineMetadata uses its own version number to track
|
/// Note that TimelineMetadata uses its own version number to track
|
||||||
/// backwards-compatible changes to the metadata format.
|
/// backwards-compatible changes to the metadata format.
|
||||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||||
|
|
||||||
|
/// Minimal sorage format version with compression support
|
||||||
|
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||||
|
|
||||||
|
/// Page image compression algorithm
|
||||||
|
pub const NO_COMPRESSION: u8 = 0;
|
||||||
|
pub const LZ4_COMPRESSION: u8 = 0;
|
||||||
|
|
||||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||||
|
|
||||||
|
|||||||
@@ -1155,9 +1155,18 @@ impl PageServerHandler {
|
|||||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
let compressed = lz4_flex::block::compress(&page);
|
||||||
page,
|
if compressed.len() < page.len() {
|
||||||
}))
|
Ok(PagestreamBeMessage::GetCompressedPage(
|
||||||
|
PagestreamGetPageResponse {
|
||||||
|
page: Bytes::from(compressed),
|
||||||
|
},
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||||
|
page,
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(shard_id))]
|
#[instrument(skip_all, fields(shard_id))]
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use anyhow::{ensure, Context};
|
|||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use enum_map::Enum;
|
use enum_map::Enum;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use lz4_flex;
|
||||||
use pageserver_api::key::{
|
use pageserver_api::key::{
|
||||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||||
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
|
|||||||
img: Bytes,
|
img: Bytes,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
let compressed = lz4_flex::block::compress(&img);
|
||||||
|
if compressed.len() < img.len() {
|
||||||
|
self.put(
|
||||||
|
rel_block_to_key(rel, blknum),
|
||||||
|
Value::CompressedImage(Bytes::from(compressed)),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
|
|||||||
if let Some((_, value)) = values.last() {
|
if let Some((_, value)) = values.last() {
|
||||||
return if let Value::Image(img) = value {
|
return if let Value::Image(img) = value {
|
||||||
Ok(img.clone())
|
Ok(img.clone())
|
||||||
|
} else if let Value::CompressedImage(img) = value {
|
||||||
|
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
|
||||||
|
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||||
|
Ok(Bytes::from(decompressed))
|
||||||
} else {
|
} else {
|
||||||
// Currently, we never need to read back a WAL record that we
|
// Currently, we never need to read back a WAL record that we
|
||||||
// inserted in the same "transaction". All the metadata updates
|
// inserted in the same "transaction". All the metadata updates
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
|
|||||||
pub enum Value {
|
pub enum Value {
|
||||||
/// An Image value contains a full copy of the value
|
/// An Image value contains a full copy of the value
|
||||||
Image(Bytes),
|
Image(Bytes),
|
||||||
|
/// An compressed page image contains a full copy of the page
|
||||||
|
CompressedImage(Bytes),
|
||||||
/// A WalRecord value contains a WAL record that needs to be
|
/// A WalRecord value contains a WAL record that needs to be
|
||||||
/// replayed get the full value. Replaying the WAL record
|
/// replayed get the full value. Replaying the WAL record
|
||||||
/// might need a previous version of the value (if will_init()
|
/// might need a previous version of the value (if will_init()
|
||||||
@@ -22,12 +24,17 @@ pub enum Value {
|
|||||||
|
|
||||||
impl Value {
|
impl Value {
|
||||||
pub fn is_image(&self) -> bool {
|
pub fn is_image(&self) -> bool {
|
||||||
matches!(self, Value::Image(_))
|
match self {
|
||||||
|
Value::Image(_) => true,
|
||||||
|
Value::CompressedImage(_) => true,
|
||||||
|
Value::WalRecord(_) => false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn will_init(&self) -> bool {
|
pub fn will_init(&self) -> bool {
|
||||||
match self {
|
match self {
|
||||||
Value::Image(_) => true,
|
Value::Image(_) => true,
|
||||||
|
Value::CompressedImage(_) => true,
|
||||||
Value::WalRecord(rec) => rec.will_init(),
|
Value::WalRecord(rec) => rec.will_init(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,13 +11,16 @@
|
|||||||
//! len < 128: 0XXXXXXX
|
//! len < 128: 0XXXXXXX
|
||||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||||
//!
|
//!
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||||
|
|
||||||
use crate::context::RequestContext;
|
use crate::context::RequestContext;
|
||||||
use crate::page_cache::PAGE_SZ;
|
use crate::page_cache::PAGE_SZ;
|
||||||
use crate::tenant::block_io::BlockCursor;
|
use crate::tenant::block_io::BlockCursor;
|
||||||
use crate::virtual_file::VirtualFile;
|
use crate::virtual_file::VirtualFile;
|
||||||
|
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||||
|
use lz4_flex;
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::io::{Error, ErrorKind};
|
use std::io::{Error, ErrorKind};
|
||||||
|
|
||||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
|||||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||||
|
/// are overwritten.
|
||||||
|
pub async fn read_compressed_blob(
|
||||||
|
&self,
|
||||||
|
offset: u64,
|
||||||
|
ctx: &RequestContext,
|
||||||
|
) -> Result<Vec<u8>, std::io::Error> {
|
||||||
|
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||||
|
let off = (offset % PAGE_SZ as u64) as usize;
|
||||||
|
|
||||||
|
let buf = self.read_blk(blknum, ctx).await?;
|
||||||
|
let compression_alg = buf[off];
|
||||||
|
let res = self.read_blob(offset + 1, ctx).await?;
|
||||||
|
if compression_alg == LZ4_COMPRESSION {
|
||||||
|
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||||
/// are overwritten.
|
/// are overwritten.
|
||||||
pub async fn read_blob_into_buf(
|
pub async fn read_blob_into_buf(
|
||||||
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
(src_buf, Ok(()))
|
(src_buf, Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
|
||||||
|
let offset = self.offset;
|
||||||
|
|
||||||
|
let len = srcbuf.len();
|
||||||
|
|
||||||
|
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||||
|
io_buf.clear();
|
||||||
|
let mut is_compressed = false;
|
||||||
|
if len < 128 {
|
||||||
|
// Short blob. Write a 1-byte length header
|
||||||
|
io_buf.put_u8(NO_COMPRESSION);
|
||||||
|
io_buf.put_u8(len as u8);
|
||||||
|
} else {
|
||||||
|
// Write a 4-byte length header
|
||||||
|
if len > 0x7fff_ffff {
|
||||||
|
return Err(Error::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
format!("blob too large ({} bytes)", len),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if len == BLCKSZ as usize {
|
||||||
|
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||||
|
if compressed.len() < len {
|
||||||
|
io_buf.put_u8(LZ4_COMPRESSION);
|
||||||
|
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||||
|
len_buf[0] |= 0x80;
|
||||||
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
|
io_buf.extend_from_slice(&compressed[..]);
|
||||||
|
is_compressed = true;
|
||||||
|
}
|
||||||
|
if is_compressed {
|
||||||
|
io_buf.put_u8(NO_COMPRESSION);
|
||||||
|
let mut len_buf = (len as u32).to_be_bytes();
|
||||||
|
len_buf[0] |= 0x80;
|
||||||
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||||
|
match hdr_res {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
self.io_buf = Some(io_buf);
|
||||||
|
if is_compressed {
|
||||||
|
hdr_res.map(|_| offset)
|
||||||
|
} else {
|
||||||
|
let (_buf, res) = self.write_all(srcbuf).await;
|
||||||
|
res.map(|_| offset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Write a blob of data. Returns the offset that it was written to,
|
/// Write a blob of data. Returns the offset that it was written to,
|
||||||
/// which can be used to retrieve the data later.
|
/// which can be used to retrieve the data later.
|
||||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||||
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
if len < 128 {
|
if len < 128 {
|
||||||
// Short blob. Write a 1-byte length header
|
// Short blob. Write a 1-byte length header
|
||||||
io_buf.put_u8(len as u8);
|
io_buf.put_u8(len as u8);
|
||||||
self.write_all(io_buf).await
|
|
||||||
} else {
|
} else {
|
||||||
// Write a 4-byte length header
|
// Write a 4-byte length header
|
||||||
if len > 0x7fff_ffff {
|
if len > 0x7fff_ffff {
|
||||||
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
|||||||
let mut len_buf = (len as u32).to_be_bytes();
|
let mut len_buf = (len as u32).to_be_bytes();
|
||||||
len_buf[0] |= 0x80;
|
len_buf[0] |= 0x80;
|
||||||
io_buf.extend_from_slice(&len_buf[..]);
|
io_buf.extend_from_slice(&len_buf[..]);
|
||||||
self.write_all(io_buf).await
|
|
||||||
}
|
}
|
||||||
|
self.write_all(io_buf).await
|
||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
self.io_buf = Some(io_buf);
|
self.io_buf = Some(io_buf);
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
|||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||||
};
|
};
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use std::cmp::{Ordering, Reverse};
|
use std::cmp::{Ordering, Reverse};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::{BinaryHeap, HashMap};
|
use std::collections::{BinaryHeap, HashMap};
|
||||||
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
|
|||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
value: Value,
|
value: Value,
|
||||||
) -> ValueReconstructSituation {
|
) -> ValueReconstructSituation {
|
||||||
let state = self
|
let mut error: Option<PageReconstructError> = None;
|
||||||
|
let key_state = self
|
||||||
.keys
|
.keys
|
||||||
.entry(*key)
|
.entry(*key)
|
||||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||||
|
|
||||||
if let Ok(state) = state {
|
let situation = if let Ok(state) = key_state {
|
||||||
let key_done = match state.situation {
|
let key_done = match state.situation {
|
||||||
ValueReconstructSituation::Complete => unreachable!(),
|
ValueReconstructSituation::Complete => unreachable!(),
|
||||||
ValueReconstructSituation::Continue => match value {
|
ValueReconstructSituation::Continue => match value {
|
||||||
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
|
|||||||
state.img = Some((lsn, img));
|
state.img = Some((lsn, img));
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
Value::CompressedImage(img) => {
|
||||||
|
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
|
||||||
|
Ok(decompressed) => {
|
||||||
|
state.img = Some((lsn, Bytes::from(decompressed)));
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error = Some(PageReconstructError::from(anyhow::anyhow!(
|
||||||
|
"Failed to decompress blobrom virtual file: {}",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Value::WalRecord(rec) => {
|
Value::WalRecord(rec) => {
|
||||||
let reached_cache =
|
let reached_cache =
|
||||||
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
||||||
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
|
|||||||
state.situation
|
state.situation
|
||||||
} else {
|
} else {
|
||||||
ValueReconstructSituation::Complete
|
ValueReconstructSituation::Complete
|
||||||
|
};
|
||||||
|
if let Some(err) = error {
|
||||||
|
*key_state = Err(err);
|
||||||
}
|
}
|
||||||
|
situation
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the Lsn at which this key is cached if one exists.
|
/// Returns the Lsn at which this key is cached if one exists.
|
||||||
|
|||||||
@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
|
|||||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||||
use bytes::BytesMut;
|
use bytes::{Bytes, BytesMut};
|
||||||
use camino::{Utf8Path, Utf8PathBuf};
|
use camino::{Utf8Path, Utf8PathBuf};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use pageserver_api::keyspace::KeySpace;
|
use pageserver_api::keyspace::KeySpace;
|
||||||
use pageserver_api::models::LayerAccessKind;
|
use pageserver_api::models::LayerAccessKind;
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
|
|||||||
need_image = false;
|
need_image = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Value::CompressedImage(img) => {
|
||||||
|
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||||
|
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
|
||||||
|
need_image = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
Value::WalRecord(rec) => {
|
Value::WalRecord(rec) => {
|
||||||
let will_init = rec.will_init();
|
let will_init = rec.will_init();
|
||||||
reconstruct_state.records.push((entry_lsn, rec));
|
reconstruct_state.records.push((entry_lsn, rec));
|
||||||
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
|
|||||||
Value::Image(img) => {
|
Value::Image(img) => {
|
||||||
format!(" img {} bytes", img.len())
|
format!(" img {} bytes", img.len())
|
||||||
}
|
}
|
||||||
|
Value::CompressedImage(img) => {
|
||||||
|
format!(" compressed img {} bytes", img.len())
|
||||||
|
}
|
||||||
Value::WalRecord(rec) => {
|
Value::WalRecord(rec) => {
|
||||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||||
format!(
|
format!(
|
||||||
@@ -1138,6 +1148,11 @@ impl DeltaLayerInner {
|
|||||||
let checkpoint = CheckPoint::decode(&img)?;
|
let checkpoint = CheckPoint::decode(&img)?;
|
||||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||||
}
|
}
|
||||||
|
Value::CompressedImage(img) => {
|
||||||
|
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||||
|
let checkpoint = CheckPoint::decode(&decompressed)?;
|
||||||
|
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||||
|
}
|
||||||
Value::WalRecord(_rec) => {
|
Value::WalRecord(_rec) => {
|
||||||
println!(" unexpected walrecord value for checkpoint key");
|
println!(" unexpected walrecord value for checkpoint key");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
|
|||||||
};
|
};
|
||||||
use crate::tenant::{PageReconstructError, Timeline};
|
use crate::tenant::{PageReconstructError, Timeline};
|
||||||
use crate::virtual_file::{self, VirtualFile};
|
use crate::virtual_file::{self, VirtualFile};
|
||||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
use crate::{
|
||||||
|
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||||
|
};
|
||||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use camino::{Utf8Path, Utf8PathBuf};
|
use camino::{Utf8Path, Utf8PathBuf};
|
||||||
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
|
|||||||
// values copied from summary
|
// values copied from summary
|
||||||
index_start_blk: u32,
|
index_start_blk: u32,
|
||||||
index_root_blk: u32,
|
index_root_blk: u32,
|
||||||
|
format_version: u16,
|
||||||
|
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
|
||||||
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
|||||||
f.debug_struct("ImageLayerInner")
|
f.debug_struct("ImageLayerInner")
|
||||||
.field("index_start_blk", &self.index_start_blk)
|
.field("index_start_blk", &self.index_start_blk)
|
||||||
.field("index_root_blk", &self.index_root_blk)
|
.field("index_root_blk", &self.index_root_blk)
|
||||||
|
.field("format_version", &self.format_version)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -408,6 +412,7 @@ impl ImageLayerInner {
|
|||||||
Ok(Ok(ImageLayerInner {
|
Ok(Ok(ImageLayerInner {
|
||||||
index_start_blk: actual_summary.index_start_blk,
|
index_start_blk: actual_summary.index_start_blk,
|
||||||
index_root_blk: actual_summary.index_root_blk,
|
index_root_blk: actual_summary.index_root_blk,
|
||||||
|
format_version: actual_summary.format_version,
|
||||||
lsn,
|
lsn,
|
||||||
file,
|
file,
|
||||||
file_id,
|
file_id,
|
||||||
@@ -436,18 +441,20 @@ impl ImageLayerInner {
|
|||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
let blob = block_reader
|
let ctx = RequestContextBuilder::extend(ctx)
|
||||||
.block_cursor()
|
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||||
.read_blob(
|
.build();
|
||||||
offset,
|
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||||
&RequestContextBuilder::extend(ctx)
|
block_reader
|
||||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
.block_cursor()
|
||||||
.build(),
|
.read_compressed_blob(offset, &ctx)
|
||||||
)
|
.await
|
||||||
.await
|
} else {
|
||||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||||
let value = Bytes::from(blob);
|
})
|
||||||
|
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||||
|
|
||||||
|
let value = Bytes::from(blob);
|
||||||
reconstruct_state.img = Some((self.lsn, value));
|
reconstruct_state.img = Some((self.lsn, value));
|
||||||
Ok(ValueReconstructResult::Complete)
|
Ok(ValueReconstructResult::Complete)
|
||||||
} else {
|
} else {
|
||||||
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
|
|||||||
///
|
///
|
||||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||||
ensure!(self.key_range.contains(&key));
|
ensure!(self.key_range.contains(&key));
|
||||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
let off = self.blob_writer.write_compressed_blob(img).await?;
|
||||||
// TODO: re-use the buffer for `img` further upstack
|
|
||||||
let off = res?;
|
|
||||||
|
|
||||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||||
key.write_to_byte_slice(&mut keybuf);
|
key.write_to_byte_slice(&mut keybuf);
|
||||||
self.tree.append(&keybuf, off)?;
|
self.tree.append(&keybuf, off)?;
|
||||||
|
|||||||
@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
|
|||||||
use crate::tenant::{PageReconstructError, Timeline};
|
use crate::tenant::{PageReconstructError, Timeline};
|
||||||
use crate::walrecord;
|
use crate::walrecord;
|
||||||
use anyhow::{anyhow, ensure, Result};
|
use anyhow::{anyhow, ensure, Result};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use lz4_flex;
|
||||||
use pageserver_api::keyspace::KeySpace;
|
use pageserver_api::keyspace::KeySpace;
|
||||||
use pageserver_api::models::InMemoryLayerInfo;
|
use pageserver_api::models::InMemoryLayerInfo;
|
||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
|
use postgres_ffi::BLCKSZ;
|
||||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||||
use std::sync::{Arc, OnceLock};
|
use std::sync::{Arc, OnceLock};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
@@ -133,6 +136,9 @@ impl InMemoryLayer {
|
|||||||
Ok(Value::Image(img)) => {
|
Ok(Value::Image(img)) => {
|
||||||
write!(&mut desc, " img {} bytes", img.len())?;
|
write!(&mut desc, " img {} bytes", img.len())?;
|
||||||
}
|
}
|
||||||
|
Ok(Value::CompressedImage(img)) => {
|
||||||
|
write!(&mut desc, " compressed img {} bytes", img.len())?;
|
||||||
|
}
|
||||||
Ok(Value::WalRecord(rec)) => {
|
Ok(Value::WalRecord(rec)) => {
|
||||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||||
write!(
|
write!(
|
||||||
@@ -184,6 +190,11 @@ impl InMemoryLayer {
|
|||||||
reconstruct_state.img = Some((*entry_lsn, img));
|
reconstruct_state.img = Some((*entry_lsn, img));
|
||||||
return Ok(ValueReconstructResult::Complete);
|
return Ok(ValueReconstructResult::Complete);
|
||||||
}
|
}
|
||||||
|
Value::CompressedImage(img) => {
|
||||||
|
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||||
|
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
|
||||||
|
return Ok(ValueReconstructResult::Complete);
|
||||||
|
}
|
||||||
Value::WalRecord(rec) => {
|
Value::WalRecord(rec) => {
|
||||||
let will_init = rec.will_init();
|
let will_init = rec.will_init();
|
||||||
reconstruct_state.records.push((*entry_lsn, rec));
|
reconstruct_state.records.push((*entry_lsn, rec));
|
||||||
|
|||||||
@@ -471,8 +471,9 @@ impl WalIngest {
|
|||||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
|
||||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
|
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
|
||||||
|
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
|
||||||
// do not materialize null pages because them most likely be soon replaced with real data
|
// do not materialize null pages because them most likely be soon replaced with real data
|
||||||
&& blk.bimg_len != 0
|
&& blk.bimg_len != 0
|
||||||
{
|
{
|
||||||
@@ -480,7 +481,21 @@ impl WalIngest {
|
|||||||
let img_len = blk.bimg_len as usize;
|
let img_len = blk.bimg_len as usize;
|
||||||
let img_offs = blk.bimg_offset as usize;
|
let img_offs = blk.bimg_offset as usize;
|
||||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
if postgres_ffi::bkpimage_is_compressed_lz4(
|
||||||
|
blk.bimg_info,
|
||||||
|
modification.tline.pg_version,
|
||||||
|
)? {
|
||||||
|
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
|
||||||
|
let decompressed = lz4_flex::block::decompress(
|
||||||
|
&decoded.record[img_offs..img_offs + img_len],
|
||||||
|
decompressed_img_len,
|
||||||
|
)
|
||||||
|
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||||
|
assert_eq!(decompressed.len(), decompressed_img_len);
|
||||||
|
image.extend_from_slice(&decompressed);
|
||||||
|
} else {
|
||||||
|
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||||
|
}
|
||||||
|
|
||||||
if blk.hole_length != 0 {
|
if blk.hole_length != 0 {
|
||||||
let tail = image.split_off(blk.hole_offset as usize);
|
let tail = image.split_off(blk.hole_offset as usize);
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ OBJS = \
|
|||||||
|
|
||||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||||
SHLIB_LINK_INTERNAL = $(libpq)
|
SHLIB_LINK_INTERNAL = $(libpq)
|
||||||
SHLIB_LINK = -lcurl
|
SHLIB_LINK = -lcurl -llz4
|
||||||
|
|
||||||
EXTENSION = neon
|
EXTENSION = neon
|
||||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ typedef enum
|
|||||||
T_NeonErrorResponse,
|
T_NeonErrorResponse,
|
||||||
T_NeonDbSizeResponse,
|
T_NeonDbSizeResponse,
|
||||||
T_NeonGetSlruSegmentResponse,
|
T_NeonGetSlruSegmentResponse,
|
||||||
|
T_NeonGetCompressedPageResponse
|
||||||
} NeonMessageTag;
|
} NeonMessageTag;
|
||||||
|
|
||||||
/* base struct for c-style inheritance */
|
/* base struct for c-style inheritance */
|
||||||
@@ -144,6 +145,15 @@ typedef struct
|
|||||||
|
|
||||||
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
NeonMessageTag tag;
|
||||||
|
uint16 compressed_size;
|
||||||
|
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||||
|
} NeonGetCompressedPageResponse;
|
||||||
|
|
||||||
|
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
|
||||||
|
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
NeonMessageTag tag;
|
NeonMessageTag tag;
|
||||||
|
|||||||
@@ -45,6 +45,10 @@
|
|||||||
*/
|
*/
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#ifdef USE_LZ4
|
||||||
|
#include <lz4.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
#include "access/xlogdefs.h"
|
#include "access/xlogdefs.h"
|
||||||
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
case T_NeonExistsResponse:
|
case T_NeonExistsResponse:
|
||||||
case T_NeonNblocksResponse:
|
case T_NeonNblocksResponse:
|
||||||
case T_NeonGetPageResponse:
|
case T_NeonGetPageResponse:
|
||||||
|
case T_NeonGetCompressedPageResponse:
|
||||||
case T_NeonErrorResponse:
|
case T_NeonErrorResponse:
|
||||||
case T_NeonDbSizeResponse:
|
case T_NeonDbSizeResponse:
|
||||||
case T_NeonGetSlruSegmentResponse:
|
case T_NeonGetSlruSegmentResponse:
|
||||||
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
|
|||||||
|
|
||||||
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
||||||
|
|
||||||
|
resp = (NeonResponse *) msg_resp;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case T_NeonGetCompressedPageResponse:
|
||||||
|
{
|
||||||
|
NeonGetCompressedPageResponse *msg_resp;
|
||||||
|
uint16 compressed_size = pq_getmsgint(s, 2);
|
||||||
|
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
|
||||||
|
msg_resp->tag = tag;
|
||||||
|
msg_resp->compressed_size = compressed_size;
|
||||||
|
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
|
||||||
|
pq_getmsgend(s);
|
||||||
|
|
||||||
|
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
|
||||||
|
|
||||||
resp = (NeonResponse *) msg_resp;
|
resp = (NeonResponse *) msg_resp;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case T_NeonGetCompressedPageResponse:
|
||||||
|
{
|
||||||
|
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
|
||||||
|
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
|
||||||
|
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
|
||||||
|
appendStringInfoChar(&s, '}');
|
||||||
|
break;
|
||||||
|
}
|
||||||
case T_NeonErrorResponse:
|
case T_NeonErrorResponse:
|
||||||
{
|
{
|
||||||
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
|
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
|
||||||
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case T_NeonGetCompressedPageResponse:
|
||||||
|
{
|
||||||
|
#ifndef USE_LZ4
|
||||||
|
ereport(ERROR, \
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
|
||||||
|
errmsg("compression method lz4 not supported"), \
|
||||||
|
errdetail("This functionality requires the server to be built with lz4 support."), \
|
||||||
|
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
|
||||||
|
#else
|
||||||
|
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
|
||||||
|
int rc = LZ4_decompress_safe(cp->page,
|
||||||
|
buffer,
|
||||||
|
cp->compressed_size,
|
||||||
|
BLCKSZ);
|
||||||
|
if (rc != BLCKSZ) {
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||||
|
errmsg_internal("compressed lz4 data is corrupt")));
|
||||||
|
}
|
||||||
|
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||||
|
#endif
|
||||||
|
break;
|
||||||
|
}
|
||||||
case T_NeonErrorResponse:
|
case T_NeonErrorResponse:
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_IO_ERROR),
|
(errcode(ERRCODE_IO_ERROR),
|
||||||
|
|||||||
Reference in New Issue
Block a user