Implement compression of image and delta layers

This commit is contained in:
Konstantin Knizhnik
2022-04-29 12:22:17 +03:00
parent 992874c916
commit e28b3dee37
9 changed files with 211 additions and 80 deletions

42
Cargo.lock generated
View File

@@ -249,6 +249,9 @@ name = "cc"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -1226,6 +1229,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.56"
@@ -1669,6 +1681,7 @@ dependencies = [
"url",
"utils",
"workspace_hack",
"zstd",
]
[[package]]
@@ -3656,3 +3669,32 @@ name = "zeroize"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
[[package]]
name = "zstd"
version = "0.11.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@@ -58,6 +58,8 @@ rusoto_core = "0.47"
rusoto_s3 = "0.47"
async-trait = "0.1"
zstd = "0.11.1"
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -20,6 +20,10 @@ use utils::{
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024;
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;

View File

@@ -17,6 +17,7 @@ use fail::fail_point;
use itertools::Itertools;
use lazy_static::lazy_static;
use tracing::*;
use utils::bin_ser::BeSer;
use std::cmp::{max, min, Ordering};
use std::collections::hash_map::Entry;
@@ -32,6 +33,7 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::{Duration, Instant, SystemTime};
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config;
use crate::config::PageServerConf;
use crate::keyspace::KeySpace;
use crate::tenant_config::{TenantConf, TenantConfOpt};
@@ -1938,78 +1940,102 @@ impl LayeredTimeline {
.map(|l| l.get_lsn_range())
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
.unwrap();
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
let mut new_layers = Vec::new();
{
let mut all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
false
true
}
} else {
true
}
});
});
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
for x in all_values_iter {
let (key, lsn, value) = x?;
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
if let Some(prev_key) = prev_key {
if key != prev_key && writer.is_some() {
let size = writer.as_mut().unwrap().size();
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
while let Some(x) = all_values_iter.next() {
let (key, lsn, value) = x?;
if let Some(prev_key) = prev_key {
if key != prev_key && writer.is_some() {
let size = writer.as_mut().unwrap().size();
if size > target_file_size {
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
writer = None;
}
}
}
if writer.is_none() {
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut prefetched = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
while let Some(y) = all_values_iter.next() {
let (key, lsn, value) = y?;
samples.push(Value::ser(&value)?);
prefetched.push((key, lsn, value));
if samples.len() == config::ZSTD_MAX_SAMPLES {
break;
}
}
let dictionary =
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?;
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
key,
lsn_range.clone(),
dictionary,
)?);
// Number of sample is relatively small, so we should not exceed target file limit
for (key, lsn, value) in prefetched {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
} else {
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
if writer.is_none() {
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
key,
lsn_range.clone(),
)?);
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
let mut layers = self.layers.write().unwrap();
for l in new_layers {
layers.insert_historic(Arc::new(l));

View File

@@ -73,7 +73,7 @@ pub struct BlockCursor<R>
where
R: BlockReader,
{
reader: R,
pub reader: R,
/// last accessed page
cache: Option<(u32, R::BlockLease)>,
}

View File

@@ -23,6 +23,7 @@
//! "values" part. The actual page images and WAL records are stored in the
//! "values" part.
//!
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
@@ -43,7 +44,7 @@ use tracing::*;
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::fs;
use std::io::{BufWriter, Write};
use std::io::{BufWriter, Read, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
@@ -56,6 +57,8 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
const DICTIONARY_OFFSET: u64 = PAGE_SZ as u64;
///
/// Header stored in the beginning of the file
///
@@ -195,6 +198,9 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
/// Compression dictionary. (None if not loaded yet)
dictionary: Option<Vec<u8>>,
}
impl Layer for DeltaLayer {
@@ -257,6 +263,7 @@ impl Layer for DeltaLayer {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let dictionary = inner.dictionary.as_ref().unwrap();
for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos).with_context(|| {
format!(
@@ -264,7 +271,11 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
let buf = cursor.read_blob(pos)?;
let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?;
let mut decompressed: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed)?;
let val = Value::des(&decompressed).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
@@ -487,6 +498,10 @@ impl DeltaLayer {
}
}
let mut cursor = file.block_cursor();
let dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
inner.dictionary = Some(dictionary);
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
@@ -512,6 +527,7 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -539,6 +555,7 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -586,6 +603,7 @@ pub struct DeltaLayerWriter {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
dictionary: Vec<u8>,
}
impl DeltaLayerWriter {
@@ -598,6 +616,7 @@ impl DeltaLayerWriter {
tenantid: ZTenantId,
key_start: Key,
lsn_range: Range<Lsn>,
dictionary: Vec<u8>,
) -> Result<DeltaLayerWriter> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
@@ -613,14 +632,16 @@ impl DeltaLayerWriter {
));
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
file.seek(SeekFrom::Start(DICTIONARY_OFFSET))?;
let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
let mut blob_writer = WriteBlobWriter::new(buf_writer, DICTIONARY_OFFSET);
let off = blob_writer.write_blob(&dictionary)?;
assert!(off == DICTIONARY_OFFSET);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(DeltaLayerWriter {
conf,
path,
@@ -630,6 +651,7 @@ impl DeltaLayerWriter {
lsn_range,
tree: tree_builder,
blob_writer,
dictionary,
})
}
@@ -641,7 +663,17 @@ impl DeltaLayerWriter {
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let body = &Value::ser(&val)?;
let mut compressed: Vec<u8> = Vec::new();
let mut encoder = zstd::Encoder::with_dictionary(
&mut compressed,
config::ZSTD_COMPRESSION_LEVEL,
&self.dictionary,
)?;
encoder.write_all(&body)?;
encoder.finish()?;
let off = self.blob_writer.write_blob(&compressed)?;
let blob_ref = BlobRef::new(off, val.will_init());
@@ -698,6 +730,7 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
index_start_blk,
index_root_blk,
}),
@@ -773,7 +806,6 @@ impl<'a> DeltaValueIter<'a> {
true
},
)?;
let iter = DeltaValueIter {
all_offsets,
next_idx: 0,
@@ -791,7 +823,13 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
let mut decoder = zstd::Decoder::with_dictionary(
&*buf,
self.reader.reader.0.dictionary.as_ref().unwrap(),
)?;
let mut decompressed: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed)?;
let val = Value::des(&decompressed)?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -19,6 +19,7 @@
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -168,7 +169,8 @@ impl Layer for ImageLayer {
offset
)
})?;
let value = Bytes::from(blob);
let decompressed = zstd::bulk::decompress(&blob, PAGE_SZ)?;
let value = Bytes::from(decompressed);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
@@ -456,7 +458,8 @@ impl ImageLayerWriter {
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let compressed = zstd::bulk::compress(img, config::ZSTD_COMPRESSION_LEVEL)?;
let off = self.blob_writer.write_blob(&compressed)?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);

View File

@@ -4,6 +4,7 @@
//! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
//! its position in the file, is kept in memory, though.
//!
use crate::config;
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockReader;
@@ -318,21 +319,36 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
let mut buf = Vec::new();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let mut cursor = inner.file.block_cursor();
// First learn dictionary */
for (_key, vec_map) in keys.iter() {
// Write all page versions
for (_lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
samples.push(buf.clone());
if samples.len() == config::ZSTD_MAX_SAMPLES {
break;
}
}
}
let dictionary = zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?;
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
Key::MIN,
self.start_lsn..inner.end_lsn.unwrap(),
dictionary,
)?;
let mut buf = Vec::new();
let mut cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
for (key, vec_map) in keys.iter() {
let key = **key;
// Write all page versions