diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 869a094963..50030881ca 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -21,7 +21,8 @@ use crate::layered_repository::TIMELINES_SEGMENT_NAME; use crate::tenant_config::{TenantConf, TenantConfOpt}; pub const ZSTD_MAX_SAMPLES: usize = 1024; -pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024; +pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd +pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 128 * 1024; pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level pub mod defaults { diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 141a1b0780..f7731afd8d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1996,8 +1996,11 @@ impl LayeredTimeline { break; } } - let dictionary = - zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?; + let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES { + zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)? + } else { + Vec::new() + }; writer = Some(DeltaLayerWriter::new( self.conf, diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 9101f4fff4..e07928c7fa 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -40,7 +40,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; use serde::{Deserialize, Serialize}; use tracing::*; -// avoid binding to Write (conflicts with std::io::Write) +// avoid binding to Write (conflicts with std::sio::Write) // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::fs; @@ -271,11 +271,14 @@ impl Layer for DeltaLayer { file.file.path.display() ) })?; - let buf = cursor.read_blob(pos)?; - let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?; - let mut decompressed: Vec = Vec::new(); - decoder.read_to_end(&mut decompressed)?; - let val = Value::des(&decompressed).with_context(|| { + let val = if dictionary.is_empty() { + Value::des(&buf) + } else { + let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?; + let mut decompressed: Vec = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + Value::des(&decompressed) + }.with_context(|| { format!( "Failed to deserialize file blob from virtual file {}", file.file.path.display() @@ -353,7 +356,6 @@ impl Layer for DeltaLayer { } let inner = self.load()?; - println!( "index_start_blk: {}, root {}", inner.index_start_blk, inner.index_root_blk @@ -369,6 +371,8 @@ impl Layer for DeltaLayer { tree_reader.dump()?; let mut cursor = file.block_cursor(); + let dictionary = inner.dictionary.as_ref().unwrap(); + tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, @@ -380,7 +384,15 @@ impl Layer for DeltaLayer { let mut desc = String::new(); match cursor.read_blob(blob_ref.pos()) { Ok(buf) => { - let val = Value::des(&buf); + let val = if dictionary.is_empty() { + Value::des(&buf) + } else { + let mut decoder = + zstd::Decoder::with_dictionary(&*buf, dictionary).unwrap(); + let mut decompressed: Vec = Vec::new(); + decoder.read_to_end(&mut decompressed).unwrap(); + Value::des(&decompressed) + }; match val { Ok(Value::Image(img)) => { write!(&mut desc, " img {} bytes", img.len()).unwrap(); @@ -664,16 +676,19 @@ impl DeltaLayerWriter { assert!(self.lsn_range.start <= lsn); let body = &Value::ser(&val)?; - let mut compressed: Vec = Vec::new(); - let mut encoder = zstd::Encoder::with_dictionary( - &mut compressed, - config::ZSTD_COMPRESSION_LEVEL, - &self.dictionary, - )?; - encoder.write_all(&body)?; - encoder.finish()?; - - let off = self.blob_writer.write_blob(&compressed)?; + let off = if self.dictionary.is_empty() { + self.blob_writer.write_blob(body) + } else { + let mut compressed: Vec = Vec::new(); + let mut encoder = zstd::Encoder::with_dictionary( + &mut compressed, + config::ZSTD_COMPRESSION_LEVEL, + &self.dictionary, + )?; + encoder.write_all(body)?; + encoder.finish()?; + self.blob_writer.write_blob(&compressed) + }?; let blob_ref = BlobRef::new(off, val.will_init()); @@ -823,13 +838,18 @@ impl<'a> DeltaValueIter<'a> { let lsn = delta_key.lsn(); let buf = self.reader.read_blob(blob_ref.pos())?; - let mut decoder = zstd::Decoder::with_dictionary( - &*buf, - self.reader.reader.0.dictionary.as_ref().unwrap(), - )?; - let mut decompressed: Vec = Vec::new(); - decoder.read_to_end(&mut decompressed)?; - let val = Value::des(&decompressed)?; + let dictionary = self.reader.reader.0.dictionary.as_ref().unwrap(); + let val = if dictionary.is_empty() { + Value::des(&buf) + } else { + let mut decoder = zstd::Decoder::with_dictionary( + &*buf, + self.reader.reader.0.dictionary.as_ref().unwrap(), + )?; + let mut decompressed: Vec = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + Value::des(&decompressed) + }?; self.next_idx += 1; Ok(Some((key, lsn, val))) } else { diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 312e655bfc..5026f89c1b 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -338,8 +338,11 @@ impl InMemoryLayer { } } } - let dictionary = zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?; - + let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES { + zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)? + } else { + Vec::new() + }; let mut delta_layer_writer = DeltaLayerWriter::new( self.conf, self.timelineid,