Use zstd::bulk::Decompressor to decode WAL records to minimize number of context initalization

This commit is contained in:
Konstantin Knizhnik
2022-05-03 09:59:33 +03:00
parent 4472d49c1e
commit 39eadf6236

View File

@@ -44,7 +44,7 @@ use tracing::*;
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::fs;
use std::io::{BufWriter, Read, Write};
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
@@ -238,6 +238,12 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let dictionary = inner.dictionary.as_ref().unwrap();
let mut decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
} else {
None
};
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -263,7 +269,6 @@ impl Layer for DeltaLayer {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let dictionary = inner.dictionary.as_ref().unwrap();
for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos).with_context(|| {
format!(
@@ -271,13 +276,12 @@ impl Layer for DeltaLayer {
file.file.path.display()
)
})?;
let val = if dictionary.is_empty() {
Value::des(&buf)
} else {
let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?;
let val = if let Some(decompressor) = &mut decompressor {
let mut decompressed: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed)?;
decompressor.decompress_to_buffer(&buf, &mut decompressed)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}
.with_context(|| {
format!(
@@ -373,6 +377,11 @@ impl Layer for DeltaLayer {
let mut cursor = file.block_cursor();
let dictionary = inner.dictionary.as_ref().unwrap();
let mut decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
} else {
None
};
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
@@ -385,14 +394,14 @@ impl Layer for DeltaLayer {
let mut desc = String::new();
match cursor.read_blob(blob_ref.pos()) {
Ok(buf) => {
let val = if dictionary.is_empty() {
Value::des(&buf)
} else {
let mut decoder =
zstd::Decoder::with_dictionary(&*buf, dictionary).unwrap();
let val = if let Some(decompressor) = &mut decompressor {
let mut decompressed: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
decompressor
.decompress_to_buffer(&buf, &mut decompressed)
.unwrap();
Value::des(&decompressed)
} else {
Value::des(&buf)
};
match val {
Ok(Value::Image(img)) => {
@@ -654,7 +663,10 @@ impl DeltaLayerWriter {
let compressor = if dictionary.is_empty() {
None
} else {
Some(zstd::bulk::Compressor::with_dictionary(config::ZSTD_COMPRESSION_LEVEL, &dictionary)?)
Some(zstd::bulk::Compressor::with_dictionary(
config::ZSTD_COMPRESSION_LEVEL,
&dictionary,
)?)
};
// Initialize the b-tree index builder
@@ -782,6 +794,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'static>>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -820,10 +833,17 @@ impl<'a> DeltaValueIter<'a> {
true
},
)?;
let dictionary = inner.dictionary.as_ref().unwrap();
let decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
} else {
None
};
let iter = DeltaValueIter {
all_offsets,
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decompressor,
};
Ok(iter)
@@ -837,17 +857,12 @@ impl<'a> DeltaValueIter<'a> {
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let dictionary = self.reader.reader.0.dictionary.as_ref().unwrap();
let val = if dictionary.is_empty() {
Value::des(&buf)
} else {
let mut decoder = zstd::Decoder::with_dictionary(
&*buf,
self.reader.reader.0.dictionary.as_ref().unwrap(),
)?;
let val = if let Some(decompressor) = &mut self.decompressor {
let mut decompressed: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed)?;
decompressor.decompress_to_buffer(&buf, &mut decompressed)?;
Value::des(&decompressed)
} else {
Value::des(&buf)
}?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))