Compare commits

...

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
0a049ae17a Not working version 2022-05-05 08:53:21 +03:00
Konstantin Knizhnik
32557b16b4 Use prepared dictionary for layer reconstruction 2022-05-04 18:17:33 +03:00
Konstantin Knizhnik
076b8e3d04 Use zstd::bulk::Decompressor::decompress instead decompredd_to_buffer 2022-05-03 11:28:32 +03:00
Konstantin Knizhnik
39eadf6236 Use zstd::bulk::Decompressor to decode WAL records to minimize number of context initalization 2022-05-03 09:59:33 +03:00
Heikki Linnakangas
4472d49c1e Reuse the zstd Compressor context when building delta layer. 2022-05-03 01:47:39 +03:00
Konstantin Knizhnik
dc057ace2f Fix formatting 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
0e49d748b8 Fix bug in dictinary creation 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
fc7d1ba043 Do not compress delta layers if there are too few elements 2022-05-02 07:58:07 +03:00
Konstantin Knizhnik
e28b3dee37 Implement compression of image and delta layers 2022-05-02 07:58:07 +03:00
9 changed files with 275 additions and 82 deletions

42
Cargo.lock generated
View File

@@ -249,6 +249,9 @@ name = "cc"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -1226,6 +1229,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.56"
@@ -1669,6 +1681,7 @@ dependencies = [
"url",
"utils",
"workspace_hack",
"zstd",
]
[[package]]
@@ -3656,3 +3669,32 @@ name = "zeroize"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
[[package]]
name = "zstd"
version = "0.11.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@@ -58,6 +58,8 @@ rusoto_core = "0.47"
rusoto_s3 = "0.47"
async-trait = "0.1"
zstd = "0.11.1"
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -20,6 +20,12 @@ use utils::{
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024;
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records?
pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;

View File

@@ -17,6 +17,7 @@ use fail::fail_point;
use itertools::Itertools;
use lazy_static::lazy_static;
use tracing::*;
use utils::bin_ser::BeSer;
use std::cmp::{max, min, Ordering};
use std::collections::hash_map::Entry;
@@ -32,6 +33,7 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config;
use crate::config::PageServerConf;
use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfOpt};
@@ -1938,78 +1940,109 @@ impl LayeredTimeline {
.map(|l| l.get_lsn_range())
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
.unwrap();
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
let mut new_layers = Vec::new();
{
let mut all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
false
true
}
} else {
true
}
});
});
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
for x in all_values_iter {
let (key, lsn, value) = x?;
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
if let Some(prev_key) = prev_key {
if key != prev_key && writer.is_some() {
let size = writer.as_mut().unwrap().size();
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
while let Some(x) = all_values_iter.next() {
let (key, lsn, value) = x?;
if let Some(prev_key) = prev_key {
if key != prev_key && writer.is_some() {
let size = writer.as_mut().unwrap().size();
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
}
}
}
if writer.is_none() {
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut prefetched = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
while let Some(y) = all_values_iter.next() {
if let Ok((key, lsn, value)) = y {
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
if samples.len() == config::ZSTD_MAX_SAMPLES {
break;
}
} else {
break;
}
}
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
} else {
Vec::new()
};
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
key,
lsn_range.clone(),
dictionary,
)?);
// Number of sample is relatively small, so we should not exceed target file limit
for (key, lsn, value) in prefetched {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
} else {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
if writer.is_none() {
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
key,
lsn_range.clone(),
)?);
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
let mut layers = self.layers.write().unwrap();
for l in new_layers {
layers.insert_historic(Arc::new(l));

View File

@@ -73,7 +73,7 @@ pub struct BlockCursor<R>
where
R: BlockReader,
{
reader: R,
pub reader: R,
/// last accessed page
cache: Option<(u32, R::BlockLease)>,
}

View File

@@ -23,6 +23,7 @@
//! "values" part. The actual page images and WAL records are stored in the
//! "values" part.
//!
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
@@ -39,7 +40,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use serde::{Deserialize, Serialize};
use tracing::*;
// avoid binding to Write (conflicts with std::io::Write)
// avoid binding to Write (conflicts with std::sio::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::fs;
@@ -56,6 +57,8 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
const DICTIONARY_OFFSET: u64 = PAGE_SZ as u64;
///
/// Header stored in the beginning of the file
///
@@ -195,6 +198,10 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
/// Compression dictionary.
dictionary: Vec<u8>, // empty if not loaded
prepared_dictionary: Option<zstd::dict::DecoderDictionary<'static>>, // None if not loaded
}
impl Layer for DeltaLayer {
@@ -232,6 +239,12 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -264,7 +277,14 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let val = if let Some(decompressor) = &mut decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}
.with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
@@ -342,7 +362,6 @@ impl Layer for DeltaLayer {
}
let inner = self.load()?;
println!(
"index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk
@@ -358,6 +377,13 @@ impl Layer for DeltaLayer {
tree_reader.dump()?;
let mut cursor = file.block_cursor();
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
@@ -369,7 +395,14 @@ impl Layer for DeltaLayer {
let mut desc = String::new();
match cursor.read_blob(blob_ref.pos()) {
Ok(buf) => {
let val = Value::des(&buf);
let val = if let Some(decompressor) = &mut decompressor {
let decompressed = decompressor
.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)
.unwrap();
Value::des(&decompressed)
} else {
Value::des(&buf)
};
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len()).unwrap();
@@ -487,6 +520,13 @@ impl DeltaLayer {
}
}
let mut cursor = file.block_cursor();
inner.dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
inner.prepared_dictionary = if inner.dictionary.is_empty() {
None
} else {
Some(zstd::dict::DecoderDictionary::copy(&inner.dictionary))
};
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
@@ -512,6 +552,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -539,6 +581,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -586,6 +630,7 @@ pub struct DeltaLayerWriter {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
compressor: Option<zstd::bulk::Compressor<'static>>,
}
impl DeltaLayerWriter {
@@ -598,6 +643,7 @@ impl DeltaLayerWriter {
tenantid: ZTenantId,
key_start: Key,
lsn_range: Range<Lsn>,
dictionary: Vec<u8>,
) -> Result<DeltaLayerWriter> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
@@ -613,14 +659,24 @@ impl DeltaLayerWriter {
));
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(DICTIONARY_OFFSET))?;
let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
let mut blob_writer = WriteBlobWriter::new(buf_writer, DICTIONARY_OFFSET);
let off = blob_writer.write_blob(&dictionary)?;
assert!(off == DICTIONARY_OFFSET);
let compressor = if dictionary.is_empty() {
None
} else {
Some(zstd::bulk::Compressor::with_dictionary(
config::ZSTD_COMPRESSION_LEVEL,
&dictionary,
)?)
};
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(DeltaLayerWriter {
conf,
path,
@@ -630,6 +686,7 @@ impl DeltaLayerWriter {
lsn_range,
tree: tree_builder,
blob_writer,
compressor,
})
}
@@ -641,7 +698,13 @@ impl DeltaLayerWriter {
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let body = &Value::ser(&val)?;
let off = if let Some(ref mut compressor) = self.compressor {
let compressed = compressor.compress(body)?;
self.blob_writer.write_blob(&compressed)?
} else {
self.blob_writer.write_blob(body)?
};
let blob_ref = BlobRef::new(off, val.will_init());
@@ -698,6 +761,8 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk,
index_root_blk,
}),
@@ -735,6 +800,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'a>>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -773,11 +839,26 @@ impl<'a> DeltaValueIter<'a> {
true
},
)?;
let decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
/*
let decompressor = if !inner.dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(
&inner.dictionary,
)?)
} else {
None
};
*/
let iter = DeltaValueIter {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decompressor,
};
Ok(iter)
@@ -791,7 +872,13 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let val = if let Some(decompressor) = &mut self.decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -19,6 +19,7 @@
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -168,7 +169,8 @@ impl Layer for ImageLayer {
offset
)
})?;
let value = Bytes::from(blob);
let decompressed = zstd::bulk::decompress(&blob, PAGE_SZ)?;
let value = Bytes::from(decompressed);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
@@ -456,7 +458,8 @@ impl ImageLayerWriter {
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let compressed = zstd::bulk::compress(img, config::ZSTD_COMPRESSION_LEVEL)?;
let off = self.blob_writer.write_blob(&compressed)?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);

View File

@@ -4,6 +4,7 @@
//! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
//! its position in the file, is kept in memory, though.
//!
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockReader;
@@ -318,21 +319,40 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut buf = Vec::new();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let mut cursor = inner.file.block_cursor();
// First learn dictionary
'train: for (_key, vec_map) in keys.iter() {
// Write all page versions
for (_lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
samples.push(buf.clone());
if samples.len() == config::ZSTD_MAX_SAMPLES {
break 'train;
}
}
}
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
} else {
Vec::new()
};
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
Key::MIN,
self.start_lsn..inner.end_lsn.unwrap(),
dictionary,
)?;
let mut buf = Vec::new();
let mut cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
for (key, vec_map) in keys.iter() {
let key = **key;
// Write all page versions