diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 679e31a4af..52892e70c6 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -22,7 +22,7 @@ use crate::tenant_config::{TenantConf, TenantConfOpt}; pub const ZSTD_MAX_SAMPLES: usize = 1024; pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd -pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 128 * 1024; +pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024; pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records? diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index ec38e52900..678dacefc5 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -199,8 +199,9 @@ pub struct DeltaLayerInner { /// Reader object for reading blocks from the file. (None if not loaded yet) file: Option>, - /// Compression dictionary. (None if not loaded yet) - dictionary: Option>, + /// Compression dictionary. + dictionary: Vec, // empty if not loaded + prepared_dictionary: Option>, // None if not loaded } impl Layer for DeltaLayer { @@ -238,11 +239,11 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory let inner = self.load()?; - let dictionary = inner.dictionary.as_ref().unwrap(); - let mut decompressor = if !dictionary.is_empty() { - Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?) - } else { - None + let mut decompressor = match &inner.prepared_dictionary { + Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary( + dictionary, + )?), + None => None, }; // Scan the page versions backwards, starting from `lsn`. @@ -376,11 +377,11 @@ impl Layer for DeltaLayer { tree_reader.dump()?; let mut cursor = file.block_cursor(); - let dictionary = inner.dictionary.as_ref().unwrap(); - let mut decompressor = if !dictionary.is_empty() { - Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?) - } else { - None + let mut decompressor = match &inner.prepared_dictionary { + Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary( + dictionary, + )?), + None => None, }; tree_reader.visit( @@ -520,9 +521,12 @@ impl DeltaLayer { } let mut cursor = file.block_cursor(); - let dictionary = cursor.read_blob(DICTIONARY_OFFSET)?; - inner.dictionary = Some(dictionary); - + inner.dictionary = cursor.read_blob(DICTIONARY_OFFSET)?; + inner.prepared_dictionary = if inner.dictionary.is_empty() { + None + } else { + Some(zstd::dict::DecoderDictionary::copy(&inner.dictionary)) + }; inner.index_start_blk = actual_summary.index_start_blk; inner.index_root_blk = actual_summary.index_root_blk; @@ -548,7 +552,8 @@ impl DeltaLayer { inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, - dictionary: None, + dictionary: Vec::new(), + prepared_dictionary: None, index_start_blk: 0, index_root_blk: 0, }), @@ -576,7 +581,8 @@ impl DeltaLayer { inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, - dictionary: None, + dictionary: Vec::new(), + prepared_dictionary: None, index_start_blk: 0, index_root_blk: 0, }), @@ -755,7 +761,8 @@ impl DeltaLayerWriter { inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, - dictionary: None, + dictionary: Vec::new(), + prepared_dictionary: None, index_start_blk, index_root_blk, }), @@ -793,7 +800,7 @@ struct DeltaValueIter<'a> { all_offsets: Vec<(DeltaKey, BlobRef)>, next_idx: usize, reader: BlockCursor>, - decompressor: Option>, + decompressor: Option>, } struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); @@ -832,9 +839,10 @@ impl<'a> DeltaValueIter<'a> { true }, )?; - let dictionary = inner.dictionary.as_ref().unwrap(); - let decompressor = if !dictionary.is_empty() { - Some(zstd::bulk::Decompressor::with_dictionary(dictionary)?) + let decompressor = if !inner.dictionary.is_empty() { + Some(zstd::bulk::Decompressor::with_dictionary( + &inner.dictionary, + )?) } else { None }; diff --git a/test_runner/performance/test_compression.py b/test_runner/performance/test_compression.py new file mode 100644 index 0000000000..085a5da879 --- /dev/null +++ b/test_runner/performance/test_compression.py @@ -0,0 +1,23 @@ +# Test sequential scan speed +# +from contextlib import closing +from dataclasses import dataclass +from fixtures.zenith_fixtures import ZenithEnv +from fixtures.log_helper import log +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker +from fixtures.compare_fixtures import PgCompare +import pytest + + +@pytest.mark.parametrize('rows', [pytest.param(10000000)]) +def test_compression(zenith_with_baseline: PgCompare, rows: int): + env = zenith_with_baseline + + with closing(env.pg.connect()) as conn: + with conn.cursor() as cur: + with env.record_duration('insert'): + cur.execute( + f'create table t as select generate_series(1,{rows}) as pk,(random()*10)::bigint as r10,(random()*100)::bigint as r100,(random()*1000)::bigint as r1000,(random()*10000)::bigint as r10000' + ) + with env.record_duration('select'): + cur.execute('select sum(r100) from t')