mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
Do not compress delta layers if there are too few elements
This commit is contained in:
@@ -21,7 +21,8 @@ use crate::layered_repository::TIMELINES_SEGMENT_NAME;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
pub const ZSTD_MAX_SAMPLES: usize = 1024;
|
||||
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024;
|
||||
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
|
||||
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 128 * 1024;
|
||||
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
|
||||
|
||||
pub mod defaults {
|
||||
|
||||
@@ -1996,8 +1996,11 @@ impl LayeredTimeline {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let dictionary =
|
||||
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?;
|
||||
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
|
||||
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
writer = Some(DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
|
||||
@@ -40,7 +40,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// avoid binding to Write (conflicts with std::sio::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
use std::fs;
|
||||
@@ -271,11 +271,14 @@ impl Layer for DeltaLayer {
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let buf = cursor.read_blob(pos)?;
|
||||
let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?;
|
||||
let mut decompressed: Vec<u8> = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed)?;
|
||||
let val = Value::des(&decompressed).with_context(|| {
|
||||
let val = if dictionary.is_empty() {
|
||||
Value::des(&buf)
|
||||
} else {
|
||||
let mut decoder = zstd::Decoder::with_dictionary(&*buf, dictionary)?;
|
||||
let mut decompressed: Vec<u8> = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed)?;
|
||||
Value::des(&decompressed)
|
||||
}.with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
@@ -353,7 +356,6 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
@@ -369,6 +371,8 @@ impl Layer for DeltaLayer {
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
let dictionary = inner.dictionary.as_ref().unwrap();
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
@@ -380,7 +384,15 @@ impl Layer for DeltaLayer {
|
||||
let mut desc = String::new();
|
||||
match cursor.read_blob(blob_ref.pos()) {
|
||||
Ok(buf) => {
|
||||
let val = Value::des(&buf);
|
||||
let val = if dictionary.is_empty() {
|
||||
Value::des(&buf)
|
||||
} else {
|
||||
let mut decoder =
|
||||
zstd::Decoder::with_dictionary(&*buf, dictionary).unwrap();
|
||||
let mut decompressed: Vec<u8> = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed).unwrap();
|
||||
Value::des(&decompressed)
|
||||
};
|
||||
match val {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len()).unwrap();
|
||||
@@ -664,16 +676,19 @@ impl DeltaLayerWriter {
|
||||
assert!(self.lsn_range.start <= lsn);
|
||||
|
||||
let body = &Value::ser(&val)?;
|
||||
let mut compressed: Vec<u8> = Vec::new();
|
||||
let mut encoder = zstd::Encoder::with_dictionary(
|
||||
&mut compressed,
|
||||
config::ZSTD_COMPRESSION_LEVEL,
|
||||
&self.dictionary,
|
||||
)?;
|
||||
encoder.write_all(&body)?;
|
||||
encoder.finish()?;
|
||||
|
||||
let off = self.blob_writer.write_blob(&compressed)?;
|
||||
let off = if self.dictionary.is_empty() {
|
||||
self.blob_writer.write_blob(body)
|
||||
} else {
|
||||
let mut compressed: Vec<u8> = Vec::new();
|
||||
let mut encoder = zstd::Encoder::with_dictionary(
|
||||
&mut compressed,
|
||||
config::ZSTD_COMPRESSION_LEVEL,
|
||||
&self.dictionary,
|
||||
)?;
|
||||
encoder.write_all(body)?;
|
||||
encoder.finish()?;
|
||||
self.blob_writer.write_blob(&compressed)
|
||||
}?;
|
||||
|
||||
let blob_ref = BlobRef::new(off, val.will_init());
|
||||
|
||||
@@ -823,13 +838,18 @@ impl<'a> DeltaValueIter<'a> {
|
||||
let lsn = delta_key.lsn();
|
||||
|
||||
let buf = self.reader.read_blob(blob_ref.pos())?;
|
||||
let mut decoder = zstd::Decoder::with_dictionary(
|
||||
&*buf,
|
||||
self.reader.reader.0.dictionary.as_ref().unwrap(),
|
||||
)?;
|
||||
let mut decompressed: Vec<u8> = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed)?;
|
||||
let val = Value::des(&decompressed)?;
|
||||
let dictionary = self.reader.reader.0.dictionary.as_ref().unwrap();
|
||||
let val = if dictionary.is_empty() {
|
||||
Value::des(&buf)
|
||||
} else {
|
||||
let mut decoder = zstd::Decoder::with_dictionary(
|
||||
&*buf,
|
||||
self.reader.reader.0.dictionary.as_ref().unwrap(),
|
||||
)?;
|
||||
let mut decompressed: Vec<u8> = Vec::new();
|
||||
decoder.read_to_end(&mut decompressed)?;
|
||||
Value::des(&decompressed)
|
||||
}?;
|
||||
self.next_idx += 1;
|
||||
Ok(Some((key, lsn, val)))
|
||||
} else {
|
||||
|
||||
@@ -338,8 +338,11 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
}
|
||||
let dictionary = zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?;
|
||||
|
||||
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
|
||||
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
|
||||
Reference in New Issue
Block a user