Use preapred decode dictionary

This commit is contained in:
Konstantin Knizhnik
2022-05-06 08:54:41 +03:00
parent 076b8e3d04
commit 4aac2aded4
3 changed files with 54 additions and 23 deletions

View File

@@ -22,7 +22,7 @@ use crate::tenant_config::{TenantConf, TenantConfOpt};
pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 128 * 1024;
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024;
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records?

View File

@@ -199,8 +199,9 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
/// Compression dictionary. (None if not loaded yet)
dictionary: Option<Vec<u8>>,
/// Compression dictionary.
dictionary: Vec<u8>, // empty if not loaded
prepared_dictionary: Option<zstd::dict::DecoderDictionary<'static>>, // None if not loaded
}
impl Layer for DeltaLayer {
@@ -238,11 +239,11 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let dictionary = inner.dictionary.as_ref().unwrap();
let mut decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
} else {
None
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
// Scan the page versions backwards, starting from `lsn`.
@@ -376,11 +377,11 @@ impl Layer for DeltaLayer {
tree_reader.dump()?;
let mut cursor = file.block_cursor();
let dictionary = inner.dictionary.as_ref().unwrap();
let mut decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
} else {
None
let mut decompressor = match &inner.prepared_dictionary {
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
dictionary,
)?),
None => None,
};
tree_reader.visit(
@@ -520,9 +521,12 @@ impl DeltaLayer {
}
let mut cursor = file.block_cursor();
let dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
inner.dictionary = Some(dictionary);
inner.dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
inner.prepared_dictionary = if inner.dictionary.is_empty() {
None
} else {
Some(zstd::dict::DecoderDictionary::copy(&inner.dictionary))
};
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
@@ -548,7 +552,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -576,7 +581,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk: 0,
index_root_blk: 0,
}),
@@ -755,7 +761,8 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
dictionary: None,
dictionary: Vec::new(),
prepared_dictionary: None,
index_start_blk,
index_root_blk,
}),
@@ -793,7 +800,7 @@ struct DeltaValueIter<'a> {
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'static>>,
decompressor: Option<zstd::bulk::Decompressor<'a>>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -832,9 +839,10 @@ impl<'a> DeltaValueIter<'a> {
true
},
)?;
let dictionary = inner.dictionary.as_ref().unwrap();
let decompressor = if !dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?)
let decompressor = if !inner.dictionary.is_empty() {
Some(zstd::bulk::Decompressor::with_dictionary(
&inner.dictionary,
)?)
} else {
None
};

View File

@@ -0,0 +1,23 @@
# Test sequential scan speed
#
from contextlib import closing
from dataclasses import dataclass
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
from fixtures.compare_fixtures import PgCompare
import pytest
@pytest.mark.parametrize('rows', [pytest.param(10000000)])
def test_compression(zenith_with_baseline: PgCompare, rows: int):
env = zenith_with_baseline
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
with env.record_duration('insert'):
cur.execute(
f'create table t as select generate_series(1,{rows}) as pk,(random()*10)::bigint as r10,(random()*100)::bigint as r100,(random()*1000)::bigint as r1000,(random()*10000)::bigint as r10000'
)
with env.record_duration('select'):
cur.execute('select sum(r100) from t')