Compare commits

...

23 Commits

Author SHA1 Message Date
Konstantin Knizhnik
c6416c5aa4 Merge with main 2022-05-27 19:25:35 +03:00
Konstantin Knizhnik
5217058e8e Fix specifying destination buffer capacity for image decompression 2022-05-27 19:14:08 +03:00
Heikki Linnakangas
18272f53db Bump STORAGE_FORMAT_VERSION 2022-05-15 17:19:34 +03:00
Heikki Linnakangas
d9bc3fbc8d Merge remote-tracking branch 'origin/main' 2022-05-15 17:16:56 +03:00
Heikki Linnakangas
a0eb50552b rustfmt 2022-05-14 14:26:05 +03:00
Heikki Linnakangas
8ea907b66c Minor refactoring 2022-05-14 11:55:59 +03:00
Heikki Linnakangas
51c64d9a79 Merge remote-tracking branch 'origin/main' 2022-05-14 11:20:22 +03:00
Konstantin Knizhnik
56d7ccbd3d Merge with main 2022-05-13 18:57:32 +03:00
Konstantin Knizhnik
f40d29a035 Perform vacuum before select in compression test 2022-05-13 18:46:15 +03:00
Heikki Linnakangas
057468e27c More work on compression
Move the responsibility for training the dictionary into
DeltaLayerWriter, so that the callers don't need to know about it.

Add comments.

If the compressed version of a blob would be larger than the original,
store it uncompressed.
2022-05-12 02:04:11 +03:00
Heikki Linnakangas
6a9aab5be1 Merge remote-tracking branch 'origin/main' into layer_compression-heikki2 2022-05-11 11:47:40 +03:00
Heikki Linnakangas
224c2146d4 Fix bugs in the merge with origin/main, to make it compile 2022-05-10 16:08:59 +03:00
Heikki Linnakangas
73b6a6e3c3 Merge remote-tracking branch 'origin/main' into layer_compression-heikki2 2022-05-10 16:03:35 +03:00
Heikki Linnakangas
0ed0433e82 Eliminate limit on buffer size.
This relies on the zstd 'experimental' feature, because the
zstd::bulk::Decompressor::upper_bound() function, which uses the
function ZSTD_decompressBound() function, is still experimental in the
zstd library. I'm OK with that, it's unlikely that the function would
change, and if it does, I'm sure there will be a replacement. There's
also the zstd_safe::get_decompressed_size() function that we could
use, but we only need an upper-bound, not the exact size, so
upper_bound() seems more appropriate.
2022-05-10 15:48:08 +03:00
Heikki Linnakangas
e90b83646c Refactor DeltaLayer::dump()
Put most of the code in in a closure that returns Result, so that we
can use the ?-operator for simpler error handling.
2022-05-10 15:32:34 +03:00
Konstantin Knizhnik
4aac2aded4 Use preapred decode dictionary 2022-05-06 08:54:41 +03:00
Konstantin Knizhnik
076b8e3d04 Use zstd::bulk::Decompressor::decompress instead decompredd_to_buffer 2022-05-03 11:28:32 +03:00
Konstantin Knizhnik
39eadf6236 Use zstd::bulk::Decompressor to decode WAL records to minimize number of context initalization 2022-05-03 09:59:33 +03:00
Heikki Linnakangas
4472d49c1e Reuse the zstd Compressor context when building delta layer. 2022-05-03 01:47:39 +03:00
Konstantin Knizhnik
dc057ace2f Fix formatting 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
0e49d748b8 Fix bug in dictinary creation 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
fc7d1ba043 Do not compress delta layers if there are too few elements 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
e28b3dee37 Implement compression of image and delta layers 2022-05-02 07:58:07 +03:00
7 changed files with 503 additions and 37 deletions

42
Cargo.lock generated
View File

@@ -292,6 +292,9 @@ name = "cc"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -1356,6 +1359,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.56"
@@ -1831,6 +1843,7 @@ dependencies = [
"url",
"utils",
"workspace_hack",
"zstd",
]
[[package]]
@@ -3938,3 +3951,32 @@ name = "zeroize"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
[[package]]
name = "zstd"
version = "0.11.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@@ -54,6 +54,9 @@ crossbeam-utils = "0.8.5"
fail = "0.5.0"
git-version = "0.3.5"
# 'experimental' is needed for the `zstd::bulk::Decompressor::upper_bound` function.
zstd = { version = "0.11.1", features = ["experimental"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }

View File

@@ -22,6 +22,12 @@ use utils::{
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_SAMPLE_BYTES: usize = 10 * 1024 * 1024; // max memory size for holding samples
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024 - 4; // make dictionary + BLOB length fit in first page
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;

View File

@@ -23,6 +23,25 @@
//! "values" part. The actual page images and WAL records are stored in the
//! "values" part.
//!
//! # Compression
//!
//! Each value is stored as a Blob, which can optionally be compressed. Compression
//! is done by ZStandard, in dictionary mode, which gives pretty good compression
//! ratio even for small inputs like WAL records.
//!
//! The dictionary is built separately for each delta layer file, and stored in
//! the file itself.
//!
//! TODO: The ZStandard format includes constant 4-byte "magic bytes" in the beginning
//! of each compressed block. With small values like WAL records, that's pretty wasteful.
//! We could disable those bytes by setting the `include_magibytes' flag to false,
//! but as of this writing that's considered experimental in the zstd crate, and the
//! zstd::bulk::Decompressor::upper_bound() function doesn't work without the magic bytes
//! so we would have to find a different way of allocating the decompression buffer if
//! we did that.
//!
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
@@ -36,7 +55,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs;
@@ -75,6 +94,9 @@ struct Summary {
index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
/// Byte offset of the compression dictionary, or 0 if no compression
dictionary_offset: u64,
}
impl From<&DeltaLayer> for Summary {
@@ -90,33 +112,46 @@ impl From<&DeltaLayer> for Summary {
index_start_blk: 0,
index_root_blk: 0,
dictionary_offset: 0,
}
}
}
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
///
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// Struct representing reference to BLOB in the file. The reference contains
/// the offset to the BLOB within the file, a flag indicating if it's
/// compressed or not, and also the `will_init` flag. The `will_init` flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
struct BlobRef(u64);
/// Flag indicating that this blob is compressed
const BLOB_COMPRESSED: u64 = 1;
/// Flag indicating that this version initializes the page
const WILL_INIT: u64 = 2;
impl BlobRef {
pub fn compressed(&self) -> bool {
(self.0 & BLOB_COMPRESSED) != 0
}
pub fn will_init(&self) -> bool {
(self.0 & WILL_INIT) != 0
}
pub fn pos(&self) -> u64 {
self.0 >> 1
self.0 >> 2
}
pub fn new(pos: u64, will_init: bool) -> BlobRef {
let mut blob_ref = pos << 1;
pub fn new(pos: u64, compressed: bool, will_init: bool) -> BlobRef {
let mut blob_ref = pos << 2;
if compressed {
blob_ref |= BLOB_COMPRESSED;
}
if will_init {
blob_ref |= WILL_INIT;
}
@@ -193,6 +228,37 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
/// Compression dictionary, as raw bytes, and in prepared format ready for use
/// for decompression. None if there is no dictionary, or if 'loaded' is false.
dictionary: Option<(Vec<u8>, zstd::dict::DecoderDictionary<'static>)>,
}
impl DeltaLayerInner {
// Create a new Decompressor, using the prepared dictionary
fn create_decompressor(&self) -> Result<Option<zstd::bulk::Decompressor<'_>>> {
if let Some((_, dict)) = &self.dictionary {
let decompressor = zstd::bulk::Decompressor::with_prepared_dictionary(dict)?;
Ok(Some(decompressor))
} else {
Ok(None)
}
}
// Create a new Decompressor, without using the prepared dictionary.
//
// For the cases that you cannot use 'create_decompressor', if the
// Decompressor needs to outlive 'self'.
fn create_decompressor_not_prepared(
&self,
) -> Result<Option<zstd::bulk::Decompressor<'static>>> {
if let Some((dict, _)) = &self.dictionary {
let decompressor = zstd::bulk::Decompressor::with_dictionary(dict)?;
Ok(Some(decompressor))
} else {
Ok(None)
}
}
}
impl Layer for DeltaLayer {
@@ -234,6 +300,8 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let mut decompressor = inner.create_decompressor()?;
let mut decompress_buf = Vec::new();
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -244,7 +312,7 @@ impl Layer for DeltaLayer {
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
let mut blob_refs: Vec<(Lsn, BlobRef)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
@@ -255,21 +323,36 @@ impl Layer for DeltaLayer {
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
blob_refs.push((entry_lsn, blob_ref));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos).with_context(|| {
for (entry_lsn, blob_ref) in blob_refs {
let buf = cursor.read_blob(blob_ref.pos()).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let uncompressed_bytes = if blob_ref.compressed() {
if let Some(ref mut decompressor) = decompressor {
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
decompress_buf.clear();
decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?;
&decompress_buf
} else {
bail!("blob is compressed, but there was no dictionary");
}
} else {
&buf
};
let val = Value::des(uncompressed_bytes).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
@@ -347,7 +430,6 @@ impl Layer for DeltaLayer {
}
let inner = self.load()?;
println!(
"index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk
@@ -363,19 +445,49 @@ impl Layer for DeltaLayer {
tree_reader.dump()?;
let mut cursor = file.block_cursor();
let mut decompressor = inner.create_decompressor()?;
let mut decompress_buf = Vec::new();
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let buf = cursor.read_blob(blob_ref.pos()).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let uncompressed_bytes = if blob_ref.compressed() {
if let Some(ref mut decompressor) = decompressor {
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
decompress_buf.clear();
decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?;
&decompress_buf
} else {
bail!("blob is compressed, but there was no dictionary");
}
} else {
&buf
};
let val = Value::des(uncompressed_bytes).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
format!("img {} bytes, {} compressed", img.len(), buf.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
"rec {} bytes, {} compressed, will_init {}: {}",
uncompressed_bytes.len(),
buf.len(),
rec.will_init(),
wal_desc
@@ -494,6 +606,7 @@ impl DeltaLayer {
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
expected_summary.dictionary_offset = actual_summary.dictionary_offset;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
}
@@ -512,6 +625,13 @@ impl DeltaLayer {
}
}
// Load and prepare the dictionary, if any
if actual_summary.dictionary_offset != 0 {
let mut cursor = file.block_cursor();
let dict = cursor.read_blob(actual_summary.dictionary_offset)?;
let prepared_dict = zstd::dict::DecoderDictionary::copy(&dict);
inner.dictionary = Some((dict, prepared_dict));
}
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
@@ -537,6 +657,7 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -564,6 +685,7 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -599,6 +721,16 @@ impl DeltaLayer {
///
/// 3. Call `finish`.
///
///
/// To train the dictionary for compression, the first ZSTD_MAX_SAMPLES values
/// (or up ZSTD_MAX_SAMPLE_BYTES) are buffered in memory, before writing them
/// to disk. When the "sample buffer" fills up, the buffered values are used
/// to train a zstandard dictionary, which is then used to compress all the
/// buffered values, and all subsequent values. So the dictionary is built
/// based on just the first values, but in practice that usually gives pretty
/// good compression for all subsequent data as well. Things like page and
/// tuple headers are similar across all pages of the same relation.
///
pub struct DeltaLayerWriter {
conf: &'static PageServerConf,
path: PathBuf,
@@ -611,6 +743,13 @@ pub struct DeltaLayerWriter {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
compressor: Option<zstd::bulk::Compressor<'static>>,
dictionary_offset: u64,
training: bool,
sample_key_lsn_willinit: Vec<(Key, Lsn, bool)>,
sample_sizes: Vec<usize>,
sample_data: Vec<u8>,
}
impl DeltaLayerWriter {
@@ -641,7 +780,6 @@ impl DeltaLayerWriter {
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(DeltaLayerWriter {
conf,
path,
@@ -651,6 +789,13 @@ impl DeltaLayerWriter {
lsn_range,
tree: tree_builder,
blob_writer,
compressor: None,
dictionary_offset: 0,
training: true,
sample_key_lsn_willinit: Vec::new(),
sample_sizes: Vec::new(),
sample_data: Vec::new(),
})
}
@@ -660,18 +805,122 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
let blob_content = &Value::ser(&val)?;
// Are we still accumulating values for training the compression dictionary?
if self.training {
self.put_value_train(key, lsn, val.will_init(), blob_content)?;
if self.sample_sizes.len() >= config::ZSTD_MAX_SAMPLES
|| self.sample_data.len() >= config::ZSTD_MAX_SAMPLE_BYTES
{
self.finish_training()?;
}
} else {
self.put_value_flush(key, lsn, val.will_init(), blob_content)?;
}
Ok(())
}
/// Accumulate one key-value pair in the samples buffer
fn put_value_train(&mut self, key: Key, lsn: Lsn, will_init: bool, bytes: &[u8]) -> Result<()> {
assert!(self.training);
self.sample_key_lsn_willinit.push((key, lsn, will_init));
self.sample_sizes.push(bytes.len());
self.sample_data.extend_from_slice(bytes);
Ok(())
}
/// Train the compression dictionary, and flush out all the accumulated
/// key-value pairs to disk.
fn finish_training(&mut self) -> Result<()> {
assert!(self.training);
assert!(self.sample_sizes.len() == self.sample_key_lsn_willinit.len());
// Create the dictionary, if we had enough samples for it.
//
// If there weren't enough samples, we don't do any compression at
// all. Possibly we could still benefit from compression; for example
// if you have only one gigantic value in a single layer, it would
// still be good to compress that, without a dictionary. But we don't
// do that currently.
if self.sample_sizes.len() >= config::ZSTD_MIN_SAMPLES {
let dictionary = zstd::dict::from_continuous(
&self.sample_data,
&self.sample_sizes,
config::ZSTD_MAX_DICTIONARY_SIZE,
)?;
let off = self.blob_writer.write_blob(&dictionary)?;
self.dictionary_offset = off;
let compressor = zstd::bulk::Compressor::with_dictionary(
config::ZSTD_COMPRESSION_LEVEL,
&dictionary,
)?;
self.compressor = Some(compressor);
};
self.training = false;
// release the memory used by the sample buffers
let sample_key_lsn_willinit = std::mem::take(&mut self.sample_key_lsn_willinit);
let sample_sizes = std::mem::take(&mut self.sample_sizes);
let sample_data = std::mem::take(&mut self.sample_data);
// Compress and write out all the buffered key-value pairs
let mut buf_idx: usize = 0;
for ((key, lsn, will_init), len) in
itertools::izip!(sample_key_lsn_willinit.iter(), sample_sizes.iter())
{
let end = buf_idx + len;
self.put_value_flush(*key, *lsn, *will_init, &sample_data[buf_idx..end])?;
buf_idx = end;
}
assert!(buf_idx == sample_data.len());
Ok(())
}
/// Write a key-value pair to the file, compressing it if applicable.
pub fn put_value_flush(
&mut self,
key: Key,
lsn: Lsn,
will_init: bool,
bytes: &[u8],
) -> Result<()> {
assert!(!self.training);
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let mut blob_content = bytes;
let mut compressed = false;
let blob_ref = BlobRef::new(off, val.will_init());
// Try to compress the blob
let compressed_bytes;
if let Some(ref mut compressor) = self.compressor {
compressed_bytes = compressor.compress(blob_content)?;
// If compressed version is not any smaller than the original,
// store it uncompressed.
if compressed_bytes.len() < blob_content.len() {
blob_content = &compressed_bytes;
compressed = true;
}
}
// Write it to the file
let off = self.blob_writer.write_blob(blob_content)?;
let blob_ref = BlobRef::new(off, compressed, will_init);
// And store the reference in the B-tree
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;
Ok(())
}
///
/// Return an estimate of the file, if it was finished now.
///
pub fn size(&self) -> u64 {
self.blob_writer.size() + self.tree.borrow_writer().size()
}
@@ -679,7 +928,11 @@ impl DeltaLayerWriter {
///
/// Finish writing the delta layer.
///
pub fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
if self.training {
self.finish_training()?;
}
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -703,6 +956,7 @@ impl DeltaLayerWriter {
lsn_range: self.lsn_range.clone(),
index_start_blk,
index_root_blk,
dictionary_offset: self.dictionary_offset,
};
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
@@ -719,6 +973,7 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk,
index_root_blk,
}),
@@ -758,6 +1013,9 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'a>>,
decompress_buf: Vec<u8>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -797,10 +1055,20 @@ impl<'a> DeltaValueIter<'a> {
},
)?;
// We cannot use inner.create_decompressor() here, because it returns
// a Decompressor with lifetime that depends on 'inner', and that
// doesn't live long enough here. Cannot use the prepared dictionary
// for that reason either. Doesn't matter too much in practice because
// this Iterator is used for bulk operations, and loading the dictionary
// isn't that expensive in comparison.
let decompressor = inner.create_decompressor_not_prepared()?;
let iter = DeltaValueIter {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decompressor,
decompress_buf: Vec::new(),
};
Ok(iter)
@@ -814,7 +1082,31 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let uncompressed_bytes = if blob_ref.compressed() {
if let Some(decompressor) = &mut self.decompressor {
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| {
anyhow!(
"could not get decompressed length at offset {}",
blob_ref.pos()
)
})?;
self.decompress_buf.clear();
self.decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut self.decompress_buf)?;
&self.decompress_buf
} else {
bail!("blob is compressed, but there was no dictionary");
}
} else {
&buf
};
let val = Value::des(uncompressed_bytes).with_context(|| {
format!(
"Failed to deserialize file blob at offset {}",
blob_ref.pos()
)
})?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -19,6 +19,11 @@
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
//!
//! Each page image is compressed with ZStandard. See Compression section
//! in the delta_layer.rs for more discussion. Difference from a delta
//! layer is that we don't currently use a dictionary for image layers.
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -90,6 +95,35 @@ impl From<&ImageLayer> for Summary {
}
}
///
/// Struct representing reference to BLOB in the file. In an image layer,
/// each blob is an image of the page. It can be compressed or not, and
/// that is stored in low bit of the BlobRef.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
struct BlobRef(u64);
/// Flag indicating that this blob is compressed
const BLOB_COMPRESSED: u64 = 1;
impl BlobRef {
pub fn compressed(&self) -> bool {
(self.0 & BLOB_COMPRESSED) != 0
}
pub fn pos(&self) -> u64 {
self.0 >> 1
}
pub fn new(pos: u64, compressed: bool) -> BlobRef {
let mut blob_ref = pos << 1;
if compressed {
blob_ref |= BLOB_COMPRESSED;
}
BlobRef(blob_ref)
}
}
///
/// ImageLayer is the in-memory data structure associated with an on-disk image
/// file. We keep an ImageLayer in memory for each file, in the LayerMap. If a
@@ -121,6 +155,13 @@ pub struct ImageLayerInner {
file: Option<FileBlockReader<VirtualFile>>,
}
impl ImageLayerInner {
fn create_decompressor(&self) -> Result<zstd::bulk::Decompressor<'_>> {
let decompressor = zstd::bulk::Decompressor::new()?;
Ok(decompressor)
}
}
impl Layer for ImageLayer {
fn filename(&self) -> PathBuf {
PathBuf::from(self.layer_name().to_string())
@@ -160,20 +201,33 @@ impl Layer for ImageLayer {
let inner = self.load()?;
let mut decompressor = inner.create_decompressor()?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.filename().display(),
offset
)
})?;
let value = Bytes::from(blob);
if let Some(value) = tree_reader.get(&keybuf)? {
let blob_ref = BlobRef(value);
let blob_content =
file.block_cursor()
.read_blob(blob_ref.pos())
.with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.filename().display(),
blob_ref.pos()
)
})?;
let uncompressed_bytes = if blob_ref.compressed() {
decompressor.decompress(&blob_content, PAGE_SZ)?
} else {
blob_content
};
let value = Bytes::from(uncompressed_bytes);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
@@ -219,7 +273,17 @@ impl Layer for ImageLayer {
tree_reader.dump()?;
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
println!("key: {} offset {}", hex::encode(key), value);
let blob_ref = BlobRef(value);
println!(
"key: {} offset {}{}",
hex::encode(key),
blob_ref.pos(),
if blob_ref.compressed() {
" (compressed)"
} else {
""
}
);
true
})?;
@@ -423,6 +487,8 @@ pub struct ImageLayerWriter {
blob_writer: WriteBlobWriter<VirtualFile>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
compressor: Option<zstd::bulk::Compressor<'static>>,
}
impl ImageLayerWriter {
@@ -454,6 +520,12 @@ impl ImageLayerWriter {
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
// TODO: use a dictionary
let compressor = {
let compressor = zstd::bulk::Compressor::new(config::ZSTD_COMPRESSION_LEVEL)?;
Some(compressor)
};
let writer = ImageLayerWriter {
conf,
path,
@@ -463,6 +535,7 @@ impl ImageLayerWriter {
lsn,
tree: tree_builder,
blob_writer,
compressor,
};
Ok(writer)
@@ -475,11 +548,37 @@ impl ImageLayerWriter {
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let mut blob_content = img;
let mut compressed = false;
// Try to compress the blob
let compressed_bytes;
if blob_content.len() <= PAGE_SZ {
if let Some(ref mut compressor) = self.compressor {
compressed_bytes = compressor.compress(blob_content)?;
// If compressed version is not any smaller than the original,
// store it uncompressed. This not just an optimization, the
// the decompression assumes that too. That simplifies the
// decompression, because you don't need to jump through any
// hoops to determine how large a buffer you need to hold the
// decompression result.
if compressed_bytes.len() < blob_content.len() {
blob_content = &compressed_bytes;
compressed = true;
}
}
}
// Write it to the file
let off = self.blob_writer.write_blob(blob_content)?;
let blob_ref = BlobRef::new(off, compressed);
// And store the reference in the B-tree
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
self.tree.append(&keybuf, blob_ref.0)?;
Ok(())
}

View File

@@ -37,7 +37,7 @@ use pgdatadir_mapping::DatadirTimeline;
/// This is embedded in the metadata file, and also in the header of all the
/// layer files. If you make any backwards-incompatible changes to the storage
/// format, bump this!
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const STORAGE_FORMAT_VERSION: u16 = 4;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -0,0 +1,24 @@
# Test sequential scan speed
#
from contextlib import closing
from dataclasses import dataclass
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
from fixtures.compare_fixtures import PgCompare
import pytest
@pytest.mark.parametrize('rows', [pytest.param(10000000)])
def test_compression(zenith_with_baseline: PgCompare, rows: int):
env = zenith_with_baseline
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
with env.record_duration('insert'):
cur.execute(
f'create table t as select generate_series(1,{rows}) as pk,(random()*10)::bigint as r10,(random()*100)::bigint as r100,(random()*1000)::bigint as r1000,(random()*10000)::bigint as r10000'
)
cur.execute("vacuum t")
with env.record_duration('select'):
cur.execute('select sum(r100) from t')