diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 1668228a81..943e724c70 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -58,7 +58,8 @@ rusoto_core = "0.47" rusoto_s3 = "0.47" async-trait = "0.1" -zstd = "0.11.1" +# 'experimental' is needed for the `zstd::bulk::Decompressor::upper_bound` function. +zstd = { version = "0.11.1", features = ["experimental"] } postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 52892e70c6..4ed2cd3842 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -24,7 +24,6 @@ pub const ZSTD_MAX_SAMPLES: usize = 1024; pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024; pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level -pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records? pub mod defaults { use crate::tenant_config::defaults::*; diff --git a/pageserver/src/layered_repository/block_io.rs b/pageserver/src/layered_repository/block_io.rs index 0706c4b2f9..d027b2f0e7 100644 --- a/pageserver/src/layered_repository/block_io.rs +++ b/pageserver/src/layered_repository/block_io.rs @@ -73,7 +73,7 @@ pub struct BlockCursor where R: BlockReader, { - pub reader: R, + reader: R, /// last accessed page cache: Option<(u32, R::BlockLease)>, } diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index cfa79e9a22..b7c4873ce3 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -37,9 +37,8 @@ use crate::repository::{Key, Value, KEY_SIZE}; use crate::virtual_file::VirtualFile; use crate::walrecord; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use serde::{Deserialize, Serialize}; -use tracing::*; use std::fs; use std::io::{BufWriter, Write}; use std::io::{Seek, SeekFrom}; @@ -47,6 +46,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tracing::*; use utils::{ bin_ser::BeSer, @@ -242,6 +242,7 @@ impl Layer for DeltaLayer { )?), None => None, }; + let mut decompress_buf = Vec::new(); // Scan the page versions backwards, starting from `lsn`. let file = inner.file.as_ref().unwrap(); @@ -275,9 +276,12 @@ impl Layer for DeltaLayer { ) })?; let val = if let Some(decompressor) = &mut decompressor { - let decompressed = - decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?; - Value::des(&decompressed) + let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf) + .ok_or_else(|| anyhow!("could not get decompressed length"))?; + decompress_buf.clear(); + decompress_buf.reserve(decompressed_max_len); + let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?; + Value::des(&decompress_buf) } else { Value::des(&buf) } @@ -380,14 +384,18 @@ impl Layer for DeltaLayer { )?), None => None, }; + let mut decompress_buf = Vec::new(); // A subroutine to dump a single blob let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result { let buf = cursor.read_blob(blob_ref.pos())?; let val = if let Some(decompressor) = &mut decompressor { - let decompressed = decompressor - .decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?; - Value::des(&decompressed) + let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf) + .ok_or_else(|| anyhow!("could not get decompressed length"))?; + decompress_buf.clear(); + decompress_buf.reserve(decompressed_max_len); + let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf); + Value::des(&decompress_buf) } else { Value::des(&buf) }?; @@ -794,6 +802,8 @@ struct DeltaValueIter<'a> { next_idx: usize, reader: BlockCursor>, decompressor: Option>, + + decompress_buf: Vec, } struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); @@ -844,6 +854,7 @@ impl<'a> DeltaValueIter<'a> { next_idx: 0, reader: BlockCursor::new(Adapter(inner)), decompressor, + decompress_buf: Vec::new(), }; Ok(iter) @@ -858,9 +869,12 @@ impl<'a> DeltaValueIter<'a> { let buf = self.reader.read_blob(blob_ref.pos())?; let val = if let Some(decompressor) = &mut self.decompressor { - let decompressed = - decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?; - Value::des(&decompressed) + let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf) + .ok_or_else(|| anyhow!("could not get decompressed length"))?; + self.decompress_buf.clear(); + self.decompress_buf.reserve(decompressed_max_len); + let _ = decompressor.decompress_to_buffer(&buf, &mut self.decompress_buf)?; + Value::des(&self.decompress_buf) } else { Value::des(&buf) }?;