Compare commits

...

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
0a049ae17a Not working version 2022-05-05 08:53:21 +03:00
Konstantin Knizhnik
32557b16b4 Use prepared dictionary for layer reconstruction 2022-05-04 18:17:33 +03:00
Konstantin Knizhnik
076b8e3d04 Use zstd::bulk::Decompressor::decompress instead decompredd_to_buffer 2022-05-03 11:28:32 +03:00
Konstantin Knizhnik
39eadf6236 Use zstd::bulk::Decompressor to decode WAL records to minimize number of context initalization 2022-05-03 09:59:33 +03:00
Heikki Linnakangas
4472d49c1e Reuse the zstd Compressor context when building delta layer. 2022-05-03 01:47:39 +03:00
Konstantin Knizhnik
dc057ace2f Fix formatting 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
0e49d748b8 Fix bug in dictinary creation 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
fc7d1ba043 Do not compress delta layers if there are too few elements 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
e28b3dee37 Implement compression of image and delta layers 2022-05-02 07:58:07 +03:00
9 changed files with 275 additions and 82 deletions

42
Cargo.lock generated
View File

@@ -249,6 +249,9 @@ name = "cc"
version = "1.0.72" version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
dependencies = [
"jobserver",
]
[[package]] [[package]]
name = "cexpr" name = "cexpr"
@@ -1226,6 +1229,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.56" version = "0.3.56"
@@ -1669,6 +1681,7 @@ dependencies = [
"url", "url",
"utils", "utils",
"workspace_hack", "workspace_hack",
"zstd",
] ]
[[package]] [[package]]
@@ -3656,3 +3669,32 @@ name = "zeroize"
version = "1.5.2" version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006" checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
[[package]]
name = "zstd"
version = "0.11.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@@ -58,6 +58,8 @@ rusoto_core = "0.47"
rusoto_s3 = "0.47" rusoto_s3 = "0.47"
async-trait = "0.1" async-trait = "0.1"
zstd = "0.11.1"
postgres_ffi = { path = "../libs/postgres_ffi" } postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" } metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" } utils = { path = "../libs/utils" }

View File

@@ -20,6 +20,12 @@ use utils::{
use crate::layered_repository::TIMELINES_SEGMENT_NAME; use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt}; use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024;
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records?
pub mod defaults { pub mod defaults {
use crate::tenant_config::defaults::*; use crate::tenant_config::defaults::*;
use const_format::formatcp; use const_format::formatcp;

View File

@@ -17,6 +17,7 @@ use fail::fail_point;
use itertools::Itertools; use itertools::Itertools;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing::*; use tracing::*;
use utils::bin_ser::BeSer;
use std::cmp::{max, min, Ordering}; use std::cmp::{max, min, Ordering};
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
@@ -32,6 +33,7 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::keyspace::KeySpace; use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfOpt}; use crate::tenant_config::{TenantConf, TenantConfOpt};
@@ -1938,78 +1940,109 @@ impl LayeredTimeline {
.map(|l| l.get_lsn_range()) .map(|l| l.get_lsn_range())
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end)) .reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
.unwrap(); .unwrap();
let mut new_layers = Vec::new();
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| { {
if let Ok((a_key, a_lsn, _)) = a { let mut all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((b_key, b_lsn, _)) = b { if let Ok((a_key, a_lsn, _)) = a {
match a_key.cmp(b_key) { if let Ok((b_key, b_lsn, _)) = b {
Ordering::Less => true, match a_key.cmp(b_key) {
Ordering::Equal => a_lsn <= b_lsn, Ordering::Less => true,
Ordering::Greater => false, Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
} }
} else { } else {
false true
} }
} else { });
true
}
});
// Merge the contents of all the input delta layers into a new set // Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning. // of delta layers, based on the current partitioning.
// //
// TODO: this actually divides the layers into fixed-size chunks, not // TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning. // based on the partitioning.
// //
// TODO: we should also opportunistically materialize and // TODO: we should also opportunistically materialize and
// garbage collect what we can. // garbage collect what we can.
let mut new_layers = Vec::new(); let mut prev_key: Option<Key> = None;
let mut prev_key: Option<Key> = None; let mut writer: Option<DeltaLayerWriter> = None;
let mut writer: Option<DeltaLayerWriter> = None;
for x in all_values_iter {
let (key, lsn, value) = x?;
if let Some(prev_key) = prev_key { while let Some(x) = all_values_iter.next() {
if key != prev_key && writer.is_some() { let (key, lsn, value) = x?;
let size = writer.as_mut().unwrap().size();
if size > target_file_size { if let Some(prev_key) = prev_key {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?); if key != prev_key && writer.is_some() {
writer = None; let size = writer.as_mut().unwrap().size();
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
}
} }
} }
if writer.is_none() {
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut prefetched = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
while let Some(y) = all_values_iter.next() {
if let Ok((key, lsn, value)) = y {
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
if samples.len() == config::ZSTD_MAX_SAMPLES {
break;
}
} else {
break;
}
}
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
} else {
Vec::new()
};
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
key,
lsn_range.clone(),
dictionary,
)?);
// Number of sample is relatively small, so we should not exceed target file limit
for (key, lsn, value) in prefetched {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
} else {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
} }
if writer.is_none() { // Sync layers
writer = Some(DeltaLayerWriter::new( if !new_layers.is_empty() {
self.conf, let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
self.timelineid,
self.tenantid, // also sync the directory
key, layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
lsn_range.clone(),
)?); // Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
} }
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
} }
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
let mut layers = self.layers.write().unwrap(); let mut layers = self.layers.write().unwrap();
for l in new_layers { for l in new_layers {
layers.insert_historic(Arc::new(l)); layers.insert_historic(Arc::new(l));

View File

@@ -73,7 +73,7 @@ pub struct BlockCursor<R>
where where
R: BlockReader, R: BlockReader,
{ {
reader: R, pub reader: R,
/// last accessed page /// last accessed page
cache: Option<(u32, R::BlockLease)>, cache: Option<(u32, R::BlockLease)>,
} }

View File

@@ -23,6 +23,7 @@
//! "values" part. The actual page images and WAL records are stored in the //! "values" part. The actual page images and WAL records are stored in the
//! "values" part. //! "values" part.
//! //!
use crate::config;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
@@ -39,7 +40,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::*; use tracing::*;
// avoid binding to Write (conflicts with std::io::Write) // avoid binding to Write (conflicts with std::sio::Write)
// while being able to use std::fmt::Write's methods // while being able to use std::fmt::Write's methods
use std::fmt::Write as _; use std::fmt::Write as _;
use std::fs; use std::fs;
@@ -56,6 +57,8 @@ use utils::{
zid::{ZTenantId, ZTimelineId}, zid::{ZTenantId, ZTimelineId},
}; };
const DICTIONARY_OFFSET: u64 = PAGE_SZ as u64;
/// ///
/// Header stored in the beginning of the file /// Header stored in the beginning of the file
/// ///
@@ -195,6 +198,10 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet) /// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>, file: Option<FileBlockReader<VirtualFile>>,
/// Compression dictionary.
dictionary: Vec<u8>, // empty if not loaded
prepared_dictionary: Option<zstd::dict::DecoderDictionary<'static>>, // None if not loaded
} }
impl Layer for DeltaLayer { impl Layer for DeltaLayer {
@@ -232,6 +239,12 @@ impl Layer for DeltaLayer {
{ {
// Open the file and lock the metadata in memory // Open the file and lock the metadata in memory
let inner = self.load()?; let inner = self.load()?;
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
// Scan the page versions backwards, starting from `lsn`. // Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap(); let file = inner.file.as_ref().unwrap();
@@ -264,7 +277,14 @@ impl Layer for DeltaLayer {
file.file.path.display() file.file.path.display()
) )
})?; })?;
let val = Value::des(&buf).with_context(|| { let val = if let Some(decompressor) = &mut decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}
.with_context(|| {
format!( format!(
"Failed to deserialize file blob from virtual file {}", "Failed to deserialize file blob from virtual file {}",
file.file.path.display() file.file.path.display()
@@ -342,7 +362,6 @@ impl Layer for DeltaLayer {
} }
let inner = self.load()?; let inner = self.load()?;
println!( println!(
"index_start_blk: {}, root {}", "index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk inner.index_start_blk, inner.index_root_blk
@@ -358,6 +377,13 @@ impl Layer for DeltaLayer {
tree_reader.dump()?; tree_reader.dump()?;
let mut cursor = file.block_cursor(); let mut cursor = file.block_cursor();
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
tree_reader.visit( tree_reader.visit(
&[0u8; DELTA_KEY_SIZE], &[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards, VisitDirection::Forwards,
@@ -369,7 +395,14 @@ impl Layer for DeltaLayer {
let mut desc = String::new(); let mut desc = String::new();
match cursor.read_blob(blob_ref.pos()) { match cursor.read_blob(blob_ref.pos()) {
Ok(buf) => { Ok(buf) => {
let val = Value::des(&buf); let val = if let Some(decompressor) = &mut decompressor {
let decompressed = decompressor
.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)
.unwrap();
Value::des(&decompressed)
} else {
Value::des(&buf)
};
match val { match val {
Ok(Value::Image(img)) => { Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len()).unwrap(); write!(&mut desc, " img {} bytes", img.len()).unwrap();
@@ -487,6 +520,13 @@ impl DeltaLayer {
} }
} }
let mut cursor = file.block_cursor();
inner.dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
inner.prepared_dictionary = if inner.dictionary.is_empty() {
None
} else {
Some(zstd::dict::DecoderDictionary::copy(&inner.dictionary))
};
inner.index_start_blk = actual_summary.index_start_blk; inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk; inner.index_root_blk = actual_summary.index_root_blk;
@@ -512,6 +552,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner { inner: RwLock::new(DeltaLayerInner {
loaded: false, loaded: false,
file: None, file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0, index_start_blk: 0,
index_root_blk: 0, index_root_blk: 0,
}), }),
@@ -539,6 +581,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner { inner: RwLock::new(DeltaLayerInner {
loaded: false, loaded: false,
file: None, file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0, index_start_blk: 0,
index_root_blk: 0, index_root_blk: 0,
}), }),
@@ -586,6 +630,7 @@ pub struct DeltaLayerWriter {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>, tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>, blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
compressor: Option<zstd::bulk::Compressor<'static>>,
} }
impl DeltaLayerWriter { impl DeltaLayerWriter {
@@ -598,6 +643,7 @@ impl DeltaLayerWriter {
tenantid: ZTenantId, tenantid: ZTenantId,
key_start: Key, key_start: Key,
lsn_range: Range<Lsn>, lsn_range: Range<Lsn>,
dictionary: Vec<u8>,
) -> Result<DeltaLayerWriter> { ) -> Result<DeltaLayerWriter> {
// Create the file initially with a temporary filename. We don't know // Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will // the end key yet, so we cannot form the final filename yet. We will
@@ -613,14 +659,24 @@ impl DeltaLayerWriter {
)); ));
let mut file = VirtualFile::create(&path)?; let mut file = VirtualFile::create(&path)?;
// make room for the header block // make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?; file.seek(SeekFrom::Start(DICTIONARY_OFFSET))?;
let buf_writer = BufWriter::new(file); let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64); let mut blob_writer = WriteBlobWriter::new(buf_writer, DICTIONARY_OFFSET);
let off = blob_writer.write_blob(&dictionary)?;
assert!(off == DICTIONARY_OFFSET);
let compressor = if dictionary.is_empty() {
None
} else {
Some(zstd::bulk::Compressor::with_dictionary(
config::ZSTD_COMPRESSION_LEVEL,
&dictionary,
)?)
};
// Initialize the b-tree index builder // Initialize the b-tree index builder
let block_buf = BlockBuf::new(); let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf); let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(DeltaLayerWriter { Ok(DeltaLayerWriter {
conf, conf,
path, path,
@@ -630,6 +686,7 @@ impl DeltaLayerWriter {
lsn_range, lsn_range,
tree: tree_builder, tree: tree_builder,
blob_writer, blob_writer,
compressor,
}) })
} }
@@ -641,7 +698,13 @@ impl DeltaLayerWriter {
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> { pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
assert!(self.lsn_range.start <= lsn); assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?; let body = &Value::ser(&val)?;
let off = if let Some(ref mut compressor) = self.compressor {
let compressed = compressor.compress(body)?;
self.blob_writer.write_blob(&compressed)?
} else {
self.blob_writer.write_blob(body)?
};
let blob_ref = BlobRef::new(off, val.will_init()); let blob_ref = BlobRef::new(off, val.will_init());
@@ -698,6 +761,8 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner { inner: RwLock::new(DeltaLayerInner {
loaded: false, loaded: false,
file: None, file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk, index_start_blk,
index_root_blk, index_root_blk,
}), }),
@@ -735,6 +800,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>, all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize, next_idx: usize,
reader: BlockCursor<Adapter<'a>>, reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'a>>,
} }
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -773,11 +839,26 @@ impl<'a> DeltaValueIter<'a> {
true true
}, },
)?; )?;
let decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
/*
let decompressor = if !inner.dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(
&inner.dictionary,
)?)
} else {
None
};
*/
let iter = DeltaValueIter { let iter = DeltaValueIter {
all_offsets, all_offsets,
next_idx: 0, next_idx: 0,
reader: BlockCursor::new(Adapter(inner)), reader: BlockCursor::new(Adapter(inner)),
decompressor,
}; };
Ok(iter) Ok(iter)
@@ -791,7 +872,13 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn(); let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?; let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?; let val = if let Some(decompressor) = &mut self.decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}?;
self.next_idx += 1; self.next_idx += 1;
Ok(Some((key, lsn, val))) Ok(Some((key, lsn, val)))
} else { } else {

View File

@@ -19,6 +19,7 @@
//! layer, and offsets to the other parts. The "index" is a B-tree, //! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The //! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part. //! actual page images are stored in the "values" part.
use crate::config;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -168,7 +169,8 @@ impl Layer for ImageLayer {
offset offset
) )
})?; })?;
let value = Bytes::from(blob); let decompressed = zstd::bulk::decompress(&blob, PAGE_SZ)?;
let value = Bytes::from(decompressed);
reconstruct_state.img = Some((self.lsn, value)); reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete) Ok(ValueReconstructResult::Complete)
@@ -456,7 +458,8 @@ impl ImageLayerWriter {
/// ///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> { pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
ensure!(self.key_range.contains(&key)); ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?; let compressed = zstd::bulk::compress(img, config::ZSTD_COMPRESSION_LEVEL)?;
let off = self.blob_writer.write_blob(&compressed)?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf); key.write_to_byte_slice(&mut keybuf);

View File

@@ -4,6 +4,7 @@
//! held in an ephemeral file, not in memory. The metadata for each page version, i.e. //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
//! its position in the file, is kept in memory, though. //! its position in the file, is kept in memory, though.
//! //!
use crate::config;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter}; use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockReader; use crate::layered_repository::block_io::BlockReader;
@@ -318,21 +319,40 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now. // rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap(); let inner = self.inner.read().unwrap();
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut buf = Vec::new();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let mut cursor = inner.file.block_cursor();
// First learn dictionary
'train: for (_key, vec_map) in keys.iter() {
// Write all page versions
for (_lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
samples.push(buf.clone());
if samples.len() == config::ZSTD_MAX_SAMPLES {
break 'train;
}
}
}
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
} else {
Vec::new()
};
let mut delta_layer_writer = DeltaLayerWriter::new( let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf, self.conf,
self.timelineid, self.timelineid,
self.tenantid, self.tenantid,
Key::MIN, Key::MIN,
self.start_lsn..inner.end_lsn.unwrap(), self.start_lsn..inner.end_lsn.unwrap(),
dictionary,
)?; )?;
let mut buf = Vec::new();
let mut cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
for (key, vec_map) in keys.iter() { for (key, vec_map) in keys.iter() {
let key = **key; let key = **key;
// Write all page versions // Write all page versions