mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
23 Commits
sort-locks
...
layer_comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6416c5aa4 | ||
|
|
5217058e8e | ||
|
|
18272f53db | ||
|
|
d9bc3fbc8d | ||
|
|
a0eb50552b | ||
|
|
8ea907b66c | ||
|
|
51c64d9a79 | ||
|
|
56d7ccbd3d | ||
|
|
f40d29a035 | ||
|
|
057468e27c | ||
|
|
6a9aab5be1 | ||
|
|
224c2146d4 | ||
|
|
73b6a6e3c3 | ||
|
|
0ed0433e82 | ||
|
|
e90b83646c | ||
|
|
4aac2aded4 | ||
|
|
076b8e3d04 | ||
|
|
39eadf6236 | ||
|
|
4472d49c1e | ||
|
|
dc057ace2f | ||
|
|
0e49d748b8 | ||
|
|
fc7d1ba043 | ||
|
|
e28b3dee37 |
42
Cargo.lock
generated
42
Cargo.lock
generated
@@ -292,6 +292,9 @@ name = "cc"
|
||||
version = "1.0.72"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
@@ -1356,6 +1359,15 @@ version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.56"
|
||||
@@ -1831,6 +1843,7 @@ dependencies = [
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3938,3 +3951,32 @@ name = "zeroize"
|
||||
version = "1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "5.0.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -54,6 +54,9 @@ crossbeam-utils = "0.8.5"
|
||||
fail = "0.5.0"
|
||||
git-version = "0.3.5"
|
||||
|
||||
# 'experimental' is needed for the `zstd::bulk::Decompressor::upper_bound` function.
|
||||
zstd = { version = "0.11.1", features = ["experimental"] }
|
||||
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
etcd_broker = { path = "../libs/etcd_broker" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
|
||||
@@ -22,6 +22,12 @@ use utils::{
|
||||
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
pub const ZSTD_MAX_SAMPLES: usize = 1024;
|
||||
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
|
||||
pub const ZSTD_MAX_SAMPLE_BYTES: usize = 10 * 1024 * 1024; // max memory size for holding samples
|
||||
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024 - 4; // make dictionary + BLOB length fit in first page
|
||||
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
|
||||
|
||||
pub mod defaults {
|
||||
use crate::tenant_config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
|
||||
@@ -23,6 +23,25 @@
|
||||
//! "values" part. The actual page images and WAL records are stored in the
|
||||
//! "values" part.
|
||||
//!
|
||||
//! # Compression
|
||||
//!
|
||||
//! Each value is stored as a Blob, which can optionally be compressed. Compression
|
||||
//! is done by ZStandard, in dictionary mode, which gives pretty good compression
|
||||
//! ratio even for small inputs like WAL records.
|
||||
//!
|
||||
//! The dictionary is built separately for each delta layer file, and stored in
|
||||
//! the file itself.
|
||||
//!
|
||||
//! TODO: The ZStandard format includes constant 4-byte "magic bytes" in the beginning
|
||||
//! of each compressed block. With small values like WAL records, that's pretty wasteful.
|
||||
//! We could disable those bytes by setting the `include_magibytes' flag to false,
|
||||
//! but as of this writing that's considered experimental in the zstd crate, and the
|
||||
//! zstd::bulk::Decompressor::upper_bound() function doesn't work without the magic bytes
|
||||
//! so we would have to find a different way of allocating the decompression buffer if
|
||||
//! we did that.
|
||||
//!
|
||||
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
@@ -36,7 +55,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walrecord;
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs;
|
||||
@@ -75,6 +94,9 @@ struct Summary {
|
||||
index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
index_root_blk: u32,
|
||||
|
||||
/// Byte offset of the compression dictionary, or 0 if no compression
|
||||
dictionary_offset: u64,
|
||||
}
|
||||
|
||||
impl From<&DeltaLayer> for Summary {
|
||||
@@ -90,33 +112,46 @@ impl From<&DeltaLayer> for Summary {
|
||||
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
|
||||
dictionary_offset: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flag indicating that this version initialize the page
|
||||
const WILL_INIT: u64 = 1;
|
||||
|
||||
///
|
||||
/// Struct representing reference to BLOB in layers. Reference contains BLOB
|
||||
/// offset, and for WAL records it also contains `will_init` flag. The flag
|
||||
/// Struct representing reference to BLOB in the file. The reference contains
|
||||
/// the offset to the BLOB within the file, a flag indicating if it's
|
||||
/// compressed or not, and also the `will_init` flag. The `will_init` flag
|
||||
/// helps to determine the range of records that needs to be applied, without
|
||||
/// reading/deserializing records themselves.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
struct BlobRef(u64);
|
||||
|
||||
/// Flag indicating that this blob is compressed
|
||||
const BLOB_COMPRESSED: u64 = 1;
|
||||
|
||||
/// Flag indicating that this version initializes the page
|
||||
const WILL_INIT: u64 = 2;
|
||||
|
||||
impl BlobRef {
|
||||
pub fn compressed(&self) -> bool {
|
||||
(self.0 & BLOB_COMPRESSED) != 0
|
||||
}
|
||||
|
||||
pub fn will_init(&self) -> bool {
|
||||
(self.0 & WILL_INIT) != 0
|
||||
}
|
||||
|
||||
pub fn pos(&self) -> u64 {
|
||||
self.0 >> 1
|
||||
self.0 >> 2
|
||||
}
|
||||
|
||||
pub fn new(pos: u64, will_init: bool) -> BlobRef {
|
||||
let mut blob_ref = pos << 1;
|
||||
pub fn new(pos: u64, compressed: bool, will_init: bool) -> BlobRef {
|
||||
let mut blob_ref = pos << 2;
|
||||
if compressed {
|
||||
blob_ref |= BLOB_COMPRESSED;
|
||||
}
|
||||
if will_init {
|
||||
blob_ref |= WILL_INIT;
|
||||
}
|
||||
@@ -193,6 +228,37 @@ pub struct DeltaLayerInner {
|
||||
|
||||
/// Reader object for reading blocks from the file. (None if not loaded yet)
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
|
||||
/// Compression dictionary, as raw bytes, and in prepared format ready for use
|
||||
/// for decompression. None if there is no dictionary, or if 'loaded' is false.
|
||||
dictionary: Option<(Vec<u8>, zstd::dict::DecoderDictionary<'static>)>,
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
// Create a new Decompressor, using the prepared dictionary
|
||||
fn create_decompressor(&self) -> Result<Option<zstd::bulk::Decompressor<'_>>> {
|
||||
if let Some((_, dict)) = &self.dictionary {
|
||||
let decompressor = zstd::bulk::Decompressor::with_prepared_dictionary(dict)?;
|
||||
Ok(Some(decompressor))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new Decompressor, without using the prepared dictionary.
|
||||
//
|
||||
// For the cases that you cannot use 'create_decompressor', if the
|
||||
// Decompressor needs to outlive 'self'.
|
||||
fn create_decompressor_not_prepared(
|
||||
&self,
|
||||
) -> Result<Option<zstd::bulk::Decompressor<'static>>> {
|
||||
if let Some((dict, _)) = &self.dictionary {
|
||||
let decompressor = zstd::bulk::Decompressor::with_dictionary(dict)?;
|
||||
Ok(Some(decompressor))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
@@ -234,6 +300,8 @@ impl Layer for DeltaLayer {
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load()?;
|
||||
let mut decompressor = inner.create_decompressor()?;
|
||||
let mut decompress_buf = Vec::new();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
@@ -244,7 +312,7 @@ impl Layer for DeltaLayer {
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
let mut blob_refs: Vec<(Lsn, BlobRef)> = Vec::new();
|
||||
|
||||
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
@@ -255,21 +323,36 @@ impl Layer for DeltaLayer {
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
blob_refs.push((entry_lsn, blob_ref));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})?;
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let mut cursor = file.block_cursor();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
let buf = cursor.read_blob(pos).with_context(|| {
|
||||
for (entry_lsn, blob_ref) in blob_refs {
|
||||
let buf = cursor.read_blob(blob_ref.pos()).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
let uncompressed_bytes = if blob_ref.compressed() {
|
||||
if let Some(ref mut decompressor) = decompressor {
|
||||
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
|
||||
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
|
||||
decompress_buf.clear();
|
||||
decompress_buf.reserve(decompressed_max_len);
|
||||
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?;
|
||||
&decompress_buf
|
||||
} else {
|
||||
bail!("blob is compressed, but there was no dictionary");
|
||||
}
|
||||
} else {
|
||||
&buf
|
||||
};
|
||||
|
||||
let val = Value::des(uncompressed_bytes).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
@@ -347,7 +430,6 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
@@ -363,19 +445,49 @@ impl Layer for DeltaLayer {
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut decompressor = inner.create_decompressor()?;
|
||||
let mut decompress_buf = Vec::new();
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
let buf = cursor.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let buf = cursor.read_blob(blob_ref.pos()).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let uncompressed_bytes = if blob_ref.compressed() {
|
||||
if let Some(ref mut decompressor) = decompressor {
|
||||
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
|
||||
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
|
||||
decompress_buf.clear();
|
||||
decompress_buf.reserve(decompressed_max_len);
|
||||
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?;
|
||||
&decompress_buf
|
||||
} else {
|
||||
bail!("blob is compressed, but there was no dictionary");
|
||||
}
|
||||
} else {
|
||||
&buf
|
||||
};
|
||||
|
||||
let val = Value::des(uncompressed_bytes).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
format!("img {} bytes, {} compressed", img.len(), buf.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
"rec {} bytes, {} compressed, will_init {}: {}",
|
||||
uncompressed_bytes.len(),
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
@@ -494,6 +606,7 @@ impl DeltaLayer {
|
||||
let mut expected_summary = Summary::from(self);
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
expected_summary.dictionary_offset = actual_summary.dictionary_offset;
|
||||
if actual_summary != expected_summary {
|
||||
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
|
||||
}
|
||||
@@ -512,6 +625,13 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Load and prepare the dictionary, if any
|
||||
if actual_summary.dictionary_offset != 0 {
|
||||
let mut cursor = file.block_cursor();
|
||||
let dict = cursor.read_blob(actual_summary.dictionary_offset)?;
|
||||
let prepared_dict = zstd::dict::DecoderDictionary::copy(&dict);
|
||||
inner.dictionary = Some((dict, prepared_dict));
|
||||
}
|
||||
inner.index_start_blk = actual_summary.index_start_blk;
|
||||
inner.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
@@ -537,6 +657,7 @@ impl DeltaLayer {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
@@ -564,6 +685,7 @@ impl DeltaLayer {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
@@ -599,6 +721,16 @@ impl DeltaLayer {
|
||||
///
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
///
|
||||
/// To train the dictionary for compression, the first ZSTD_MAX_SAMPLES values
|
||||
/// (or up ZSTD_MAX_SAMPLE_BYTES) are buffered in memory, before writing them
|
||||
/// to disk. When the "sample buffer" fills up, the buffered values are used
|
||||
/// to train a zstandard dictionary, which is then used to compress all the
|
||||
/// buffered values, and all subsequent values. So the dictionary is built
|
||||
/// based on just the first values, but in practice that usually gives pretty
|
||||
/// good compression for all subsequent data as well. Things like page and
|
||||
/// tuple headers are similar across all pages of the same relation.
|
||||
///
|
||||
pub struct DeltaLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
path: PathBuf,
|
||||
@@ -611,6 +743,13 @@ pub struct DeltaLayerWriter {
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
|
||||
compressor: Option<zstd::bulk::Compressor<'static>>,
|
||||
dictionary_offset: u64,
|
||||
|
||||
training: bool,
|
||||
sample_key_lsn_willinit: Vec<(Key, Lsn, bool)>,
|
||||
sample_sizes: Vec<usize>,
|
||||
sample_data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriter {
|
||||
@@ -641,7 +780,6 @@ impl DeltaLayerWriter {
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
Ok(DeltaLayerWriter {
|
||||
conf,
|
||||
path,
|
||||
@@ -651,6 +789,13 @@ impl DeltaLayerWriter {
|
||||
lsn_range,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
compressor: None,
|
||||
dictionary_offset: 0,
|
||||
|
||||
training: true,
|
||||
sample_key_lsn_willinit: Vec::new(),
|
||||
sample_sizes: Vec::new(),
|
||||
sample_data: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -660,18 +805,122 @@ impl DeltaLayerWriter {
|
||||
/// The values must be appended in key, lsn order.
|
||||
///
|
||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
let blob_content = &Value::ser(&val)?;
|
||||
|
||||
// Are we still accumulating values for training the compression dictionary?
|
||||
if self.training {
|
||||
self.put_value_train(key, lsn, val.will_init(), blob_content)?;
|
||||
|
||||
if self.sample_sizes.len() >= config::ZSTD_MAX_SAMPLES
|
||||
|| self.sample_data.len() >= config::ZSTD_MAX_SAMPLE_BYTES
|
||||
{
|
||||
self.finish_training()?;
|
||||
}
|
||||
} else {
|
||||
self.put_value_flush(key, lsn, val.will_init(), blob_content)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Accumulate one key-value pair in the samples buffer
|
||||
fn put_value_train(&mut self, key: Key, lsn: Lsn, will_init: bool, bytes: &[u8]) -> Result<()> {
|
||||
assert!(self.training);
|
||||
self.sample_key_lsn_willinit.push((key, lsn, will_init));
|
||||
self.sample_sizes.push(bytes.len());
|
||||
self.sample_data.extend_from_slice(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Train the compression dictionary, and flush out all the accumulated
|
||||
/// key-value pairs to disk.
|
||||
fn finish_training(&mut self) -> Result<()> {
|
||||
assert!(self.training);
|
||||
assert!(self.sample_sizes.len() == self.sample_key_lsn_willinit.len());
|
||||
|
||||
// Create the dictionary, if we had enough samples for it.
|
||||
//
|
||||
// If there weren't enough samples, we don't do any compression at
|
||||
// all. Possibly we could still benefit from compression; for example
|
||||
// if you have only one gigantic value in a single layer, it would
|
||||
// still be good to compress that, without a dictionary. But we don't
|
||||
// do that currently.
|
||||
if self.sample_sizes.len() >= config::ZSTD_MIN_SAMPLES {
|
||||
let dictionary = zstd::dict::from_continuous(
|
||||
&self.sample_data,
|
||||
&self.sample_sizes,
|
||||
config::ZSTD_MAX_DICTIONARY_SIZE,
|
||||
)?;
|
||||
|
||||
let off = self.blob_writer.write_blob(&dictionary)?;
|
||||
self.dictionary_offset = off;
|
||||
|
||||
let compressor = zstd::bulk::Compressor::with_dictionary(
|
||||
config::ZSTD_COMPRESSION_LEVEL,
|
||||
&dictionary,
|
||||
)?;
|
||||
self.compressor = Some(compressor);
|
||||
};
|
||||
self.training = false;
|
||||
|
||||
// release the memory used by the sample buffers
|
||||
let sample_key_lsn_willinit = std::mem::take(&mut self.sample_key_lsn_willinit);
|
||||
let sample_sizes = std::mem::take(&mut self.sample_sizes);
|
||||
let sample_data = std::mem::take(&mut self.sample_data);
|
||||
|
||||
// Compress and write out all the buffered key-value pairs
|
||||
let mut buf_idx: usize = 0;
|
||||
for ((key, lsn, will_init), len) in
|
||||
itertools::izip!(sample_key_lsn_willinit.iter(), sample_sizes.iter())
|
||||
{
|
||||
let end = buf_idx + len;
|
||||
self.put_value_flush(*key, *lsn, *will_init, &sample_data[buf_idx..end])?;
|
||||
buf_idx = end;
|
||||
}
|
||||
assert!(buf_idx == sample_data.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write a key-value pair to the file, compressing it if applicable.
|
||||
pub fn put_value_flush(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
will_init: bool,
|
||||
bytes: &[u8],
|
||||
) -> Result<()> {
|
||||
assert!(!self.training);
|
||||
assert!(self.lsn_range.start <= lsn);
|
||||
|
||||
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
|
||||
let mut blob_content = bytes;
|
||||
let mut compressed = false;
|
||||
|
||||
let blob_ref = BlobRef::new(off, val.will_init());
|
||||
// Try to compress the blob
|
||||
let compressed_bytes;
|
||||
if let Some(ref mut compressor) = self.compressor {
|
||||
compressed_bytes = compressor.compress(blob_content)?;
|
||||
// If compressed version is not any smaller than the original,
|
||||
// store it uncompressed.
|
||||
if compressed_bytes.len() < blob_content.len() {
|
||||
blob_content = &compressed_bytes;
|
||||
compressed = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Write it to the file
|
||||
let off = self.blob_writer.write_blob(blob_content)?;
|
||||
let blob_ref = BlobRef::new(off, compressed, will_init);
|
||||
|
||||
// And store the reference in the B-tree
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
self.tree.append(&delta_key.0, blob_ref.0)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Return an estimate of the file, if it was finished now.
|
||||
///
|
||||
pub fn size(&self) -> u64 {
|
||||
self.blob_writer.size() + self.tree.borrow_writer().size()
|
||||
}
|
||||
@@ -679,7 +928,11 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
pub fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
if self.training {
|
||||
self.finish_training()?;
|
||||
}
|
||||
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -703,6 +956,7 @@ impl DeltaLayerWriter {
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
dictionary_offset: self.dictionary_offset,
|
||||
};
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
Summary::ser_into(&summary, &mut file)?;
|
||||
@@ -719,6 +973,7 @@ impl DeltaLayerWriter {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: None,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
}),
|
||||
@@ -758,6 +1013,9 @@ struct DeltaValueIter<'a> {
|
||||
all_offsets: Vec<(DeltaKey, BlobRef)>,
|
||||
next_idx: usize,
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
decompressor: Option<zstd::bulk::Decompressor<'a>>,
|
||||
|
||||
decompress_buf: Vec<u8>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
@@ -797,10 +1055,20 @@ impl<'a> DeltaValueIter<'a> {
|
||||
},
|
||||
)?;
|
||||
|
||||
// We cannot use inner.create_decompressor() here, because it returns
|
||||
// a Decompressor with lifetime that depends on 'inner', and that
|
||||
// doesn't live long enough here. Cannot use the prepared dictionary
|
||||
// for that reason either. Doesn't matter too much in practice because
|
||||
// this Iterator is used for bulk operations, and loading the dictionary
|
||||
// isn't that expensive in comparison.
|
||||
let decompressor = inner.create_decompressor_not_prepared()?;
|
||||
|
||||
let iter = DeltaValueIter {
|
||||
all_offsets,
|
||||
next_idx: 0,
|
||||
reader: BlockCursor::new(Adapter(inner)),
|
||||
decompressor,
|
||||
decompress_buf: Vec::new(),
|
||||
};
|
||||
|
||||
Ok(iter)
|
||||
@@ -814,7 +1082,31 @@ impl<'a> DeltaValueIter<'a> {
|
||||
let lsn = delta_key.lsn();
|
||||
|
||||
let buf = self.reader.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let uncompressed_bytes = if blob_ref.compressed() {
|
||||
if let Some(decompressor) = &mut self.decompressor {
|
||||
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"could not get decompressed length at offset {}",
|
||||
blob_ref.pos()
|
||||
)
|
||||
})?;
|
||||
self.decompress_buf.clear();
|
||||
self.decompress_buf.reserve(decompressed_max_len);
|
||||
let _ = decompressor.decompress_to_buffer(&buf, &mut self.decompress_buf)?;
|
||||
&self.decompress_buf
|
||||
} else {
|
||||
bail!("blob is compressed, but there was no dictionary");
|
||||
}
|
||||
} else {
|
||||
&buf
|
||||
};
|
||||
let val = Value::des(uncompressed_bytes).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob at offset {}",
|
||||
blob_ref.pos()
|
||||
)
|
||||
})?;
|
||||
self.next_idx += 1;
|
||||
Ok(Some((key, lsn, val)))
|
||||
} else {
|
||||
|
||||
@@ -19,6 +19,11 @@
|
||||
//! layer, and offsets to the other parts. The "index" is a B-tree,
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
//!
|
||||
//! Each page image is compressed with ZStandard. See Compression section
|
||||
//! in the delta_layer.rs for more discussion. Difference from a delta
|
||||
//! layer is that we don't currently use a dictionary for image layers.
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
@@ -90,6 +95,35 @@ impl From<&ImageLayer> for Summary {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Struct representing reference to BLOB in the file. In an image layer,
|
||||
/// each blob is an image of the page. It can be compressed or not, and
|
||||
/// that is stored in low bit of the BlobRef.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
struct BlobRef(u64);
|
||||
|
||||
/// Flag indicating that this blob is compressed
|
||||
const BLOB_COMPRESSED: u64 = 1;
|
||||
|
||||
impl BlobRef {
|
||||
pub fn compressed(&self) -> bool {
|
||||
(self.0 & BLOB_COMPRESSED) != 0
|
||||
}
|
||||
|
||||
pub fn pos(&self) -> u64 {
|
||||
self.0 >> 1
|
||||
}
|
||||
|
||||
pub fn new(pos: u64, compressed: bool) -> BlobRef {
|
||||
let mut blob_ref = pos << 1;
|
||||
if compressed {
|
||||
blob_ref |= BLOB_COMPRESSED;
|
||||
}
|
||||
BlobRef(blob_ref)
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// ImageLayer is the in-memory data structure associated with an on-disk image
|
||||
/// file. We keep an ImageLayer in memory for each file, in the LayerMap. If a
|
||||
@@ -121,6 +155,13 @@ pub struct ImageLayerInner {
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
fn create_decompressor(&self) -> Result<zstd::bulk::Decompressor<'_>> {
|
||||
let decompressor = zstd::bulk::Decompressor::new()?;
|
||||
Ok(decompressor)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
fn filename(&self) -> PathBuf {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
@@ -160,20 +201,33 @@ impl Layer for ImageLayer {
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
let mut decompressor = inner.create_decompressor()?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.filename().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
if let Some(value) = tree_reader.get(&keybuf)? {
|
||||
let blob_ref = BlobRef(value);
|
||||
let blob_content =
|
||||
file.block_cursor()
|
||||
.read_blob(blob_ref.pos())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.filename().display(),
|
||||
blob_ref.pos()
|
||||
)
|
||||
})?;
|
||||
|
||||
let uncompressed_bytes = if blob_ref.compressed() {
|
||||
decompressor.decompress(&blob_content, PAGE_SZ)?
|
||||
} else {
|
||||
blob_content
|
||||
};
|
||||
|
||||
let value = Bytes::from(uncompressed_bytes);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
@@ -219,7 +273,17 @@ impl Layer for ImageLayer {
|
||||
tree_reader.dump()?;
|
||||
|
||||
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
let blob_ref = BlobRef(value);
|
||||
println!(
|
||||
"key: {} offset {}{}",
|
||||
hex::encode(key),
|
||||
blob_ref.pos(),
|
||||
if blob_ref.compressed() {
|
||||
" (compressed)"
|
||||
} else {
|
||||
""
|
||||
}
|
||||
);
|
||||
true
|
||||
})?;
|
||||
|
||||
@@ -423,6 +487,8 @@ pub struct ImageLayerWriter {
|
||||
|
||||
blob_writer: WriteBlobWriter<VirtualFile>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
compressor: Option<zstd::bulk::Compressor<'static>>,
|
||||
}
|
||||
|
||||
impl ImageLayerWriter {
|
||||
@@ -454,6 +520,12 @@ impl ImageLayerWriter {
|
||||
let block_buf = BlockBuf::new();
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
// TODO: use a dictionary
|
||||
let compressor = {
|
||||
let compressor = zstd::bulk::Compressor::new(config::ZSTD_COMPRESSION_LEVEL)?;
|
||||
Some(compressor)
|
||||
};
|
||||
|
||||
let writer = ImageLayerWriter {
|
||||
conf,
|
||||
path,
|
||||
@@ -463,6 +535,7 @@ impl ImageLayerWriter {
|
||||
lsn,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
compressor,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -475,11 +548,37 @@ impl ImageLayerWriter {
|
||||
///
|
||||
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let off = self.blob_writer.write_blob(img)?;
|
||||
|
||||
let mut blob_content = img;
|
||||
let mut compressed = false;
|
||||
|
||||
// Try to compress the blob
|
||||
let compressed_bytes;
|
||||
if blob_content.len() <= PAGE_SZ {
|
||||
if let Some(ref mut compressor) = self.compressor {
|
||||
compressed_bytes = compressor.compress(blob_content)?;
|
||||
|
||||
// If compressed version is not any smaller than the original,
|
||||
// store it uncompressed. This not just an optimization, the
|
||||
// the decompression assumes that too. That simplifies the
|
||||
// decompression, because you don't need to jump through any
|
||||
// hoops to determine how large a buffer you need to hold the
|
||||
// decompression result.
|
||||
if compressed_bytes.len() < blob_content.len() {
|
||||
blob_content = &compressed_bytes;
|
||||
compressed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write it to the file
|
||||
let off = self.blob_writer.write_blob(blob_content)?;
|
||||
let blob_ref = BlobRef::new(off, compressed);
|
||||
|
||||
// And store the reference in the B-tree
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
self.tree.append(&keybuf, blob_ref.0)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ use pgdatadir_mapping::DatadirTimeline;
|
||||
/// This is embedded in the metadata file, and also in the header of all the
|
||||
/// layer files. If you make any backwards-incompatible changes to the storage
|
||||
/// format, bump this!
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
// Magic constants used to identify different kinds of files
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
|
||||
24
test_runner/performance/test_compression.py
Normal file
24
test_runner/performance/test_compression.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# Test sequential scan speed
|
||||
#
|
||||
from contextlib import closing
|
||||
from dataclasses import dataclass
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.parametrize('rows', [pytest.param(10000000)])
|
||||
def test_compression(zenith_with_baseline: PgCompare, rows: int):
|
||||
env = zenith_with_baseline
|
||||
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
with env.record_duration('insert'):
|
||||
cur.execute(
|
||||
f'create table t as select generate_series(1,{rows}) as pk,(random()*10)::bigint as r10,(random()*100)::bigint as r100,(random()*1000)::bigint as r1000,(random()*10000)::bigint as r10000'
|
||||
)
|
||||
cur.execute("vacuum t")
|
||||
with env.record_duration('select'):
|
||||
cur.execute('select sum(r100) from t')
|
||||
Reference in New Issue
Block a user