mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 12:22:56 +00:00
Compare commits
9 Commits
sergey/fix
...
layer_comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a049ae17a | ||
|
|
32557b16b4 | ||
|
|
076b8e3d04 | ||
|
|
39eadf6236 | ||
|
|
4472d49c1e | ||
|
|
dc057ace2f | ||
|
|
0e49d748b8 | ||
|
|
fc7d1ba043 | ||
|
|
e28b3dee37 |
42
Cargo.lock
generated
42
Cargo.lock
generated
@@ -249,6 +249,9 @@ name = "cc"
|
||||
version = "1.0.72"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
@@ -1226,6 +1229,15 @@ version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.56"
|
||||
@@ -1669,6 +1681,7 @@ dependencies = [
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3656,3 +3669,32 @@ name = "zeroize"
|
||||
version = "1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "5.0.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.1+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -58,6 +58,8 @@ rusoto_core = "0.47"
|
||||
rusoto_s3 = "0.47"
|
||||
async-trait = "0.1"
|
||||
|
||||
zstd = "0.11.1"
|
||||
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
utils = { path = "../libs/utils" }
|
||||
|
||||
@@ -20,6 +20,12 @@ use utils::{
|
||||
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
pub const ZSTD_MAX_SAMPLES: usize = 1024;
|
||||
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
|
||||
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 64 * 1024;
|
||||
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
|
||||
pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records?
|
||||
|
||||
pub mod defaults {
|
||||
use crate::tenant_config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
|
||||
@@ -17,6 +17,7 @@ use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::hash_map::Entry;
|
||||
@@ -32,6 +33,7 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
@@ -1938,78 +1940,109 @@ impl LayeredTimeline {
|
||||
.map(|l| l.get_lsn_range())
|
||||
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
|
||||
.unwrap();
|
||||
|
||||
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
let mut new_layers = Vec::new();
|
||||
{
|
||||
let mut all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
true
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
//
|
||||
// TODO: this actually divides the layers into fixed-size chunks, not
|
||||
// based on the partitioning.
|
||||
//
|
||||
// TODO: we should also opportunistically materialize and
|
||||
// garbage collect what we can.
|
||||
let mut new_layers = Vec::new();
|
||||
let mut prev_key: Option<Key> = None;
|
||||
let mut writer: Option<DeltaLayerWriter> = None;
|
||||
for x in all_values_iter {
|
||||
let (key, lsn, value) = x?;
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
//
|
||||
// TODO: this actually divides the layers into fixed-size chunks, not
|
||||
// based on the partitioning.
|
||||
//
|
||||
// TODO: we should also opportunistically materialize and
|
||||
// garbage collect what we can.
|
||||
let mut prev_key: Option<Key> = None;
|
||||
let mut writer: Option<DeltaLayerWriter> = None;
|
||||
|
||||
if let Some(prev_key) = prev_key {
|
||||
if key != prev_key && writer.is_some() {
|
||||
let size = writer.as_mut().unwrap().size();
|
||||
if size > target_file_size {
|
||||
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
|
||||
writer = None;
|
||||
while let Some(x) = all_values_iter.next() {
|
||||
let (key, lsn, value) = x?;
|
||||
|
||||
if let Some(prev_key) = prev_key {
|
||||
if key != prev_key && writer.is_some() {
|
||||
let size = writer.as_mut().unwrap().size();
|
||||
if size > target_file_size {
|
||||
new_layers.push(writer.take().unwrap().finish(prev_key.next())?);
|
||||
writer = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if writer.is_none() {
|
||||
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
|
||||
let mut prefetched = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
|
||||
samples.push(Value::ser(&value)?);
|
||||
prefetched.push((key, lsn, value));
|
||||
|
||||
while let Some(y) = all_values_iter.next() {
|
||||
if let Ok((key, lsn, value)) = y {
|
||||
samples.push(Value::ser(&value)?);
|
||||
prefetched.push((key, lsn, value));
|
||||
if samples.len() == config::ZSTD_MAX_SAMPLES {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
|
||||
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
writer = Some(DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
key,
|
||||
lsn_range.clone(),
|
||||
dictionary,
|
||||
)?);
|
||||
|
||||
// Number of sample is relatively small, so we should not exceed target file limit
|
||||
for (key, lsn, value) in prefetched {
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
||||
prev_key = Some(key);
|
||||
}
|
||||
} else {
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
||||
prev_key = Some(key);
|
||||
}
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next())?);
|
||||
}
|
||||
|
||||
if writer.is_none() {
|
||||
writer = Some(DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
key,
|
||||
lsn_range.clone(),
|
||||
)?);
|
||||
// Sync layers
|
||||
if !new_layers.is_empty() {
|
||||
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
|
||||
|
||||
// also sync the directory
|
||||
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync(&layer_paths)?;
|
||||
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next())?);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
if !new_layers.is_empty() {
|
||||
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
|
||||
|
||||
// also sync the directory
|
||||
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync(&layer_paths)?;
|
||||
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
for l in new_layers {
|
||||
layers.insert_historic(Arc::new(l));
|
||||
|
||||
@@ -73,7 +73,7 @@ pub struct BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
reader: R,
|
||||
pub reader: R,
|
||||
/// last accessed page
|
||||
cache: Option<(u32, R::BlockLease)>,
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
//! "values" part. The actual page images and WAL records are stored in the
|
||||
//! "values" part.
|
||||
//!
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
@@ -39,7 +40,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// avoid binding to Write (conflicts with std::sio::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
use std::fs;
|
||||
@@ -56,6 +57,8 @@ use utils::{
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
const DICTIONARY_OFFSET: u64 = PAGE_SZ as u64;
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
///
|
||||
@@ -195,6 +198,10 @@ pub struct DeltaLayerInner {
|
||||
|
||||
/// Reader object for reading blocks from the file. (None if not loaded yet)
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
|
||||
/// Compression dictionary.
|
||||
dictionary: Vec<u8>, // empty if not loaded
|
||||
prepared_dictionary: Option<zstd::dict::DecoderDictionary<'static>>, // None if not loaded
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
@@ -232,6 +239,12 @@ impl Layer for DeltaLayer {
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self.load()?;
|
||||
let mut decompressor = match &inner.prepared_dictionary {
|
||||
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
|
||||
dictionary,
|
||||
)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
@@ -264,7 +277,14 @@ impl Layer for DeltaLayer {
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
let val = if let Some(decompressor) = &mut decompressor {
|
||||
let decompressed =
|
||||
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
|
||||
Value::des(&decompressed)
|
||||
} else {
|
||||
Value::des(&buf)
|
||||
}
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
@@ -342,7 +362,6 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
@@ -358,6 +377,13 @@ impl Layer for DeltaLayer {
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut decompressor = match &inner.prepared_dictionary {
|
||||
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
|
||||
dictionary,
|
||||
)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
@@ -369,7 +395,14 @@ impl Layer for DeltaLayer {
|
||||
let mut desc = String::new();
|
||||
match cursor.read_blob(blob_ref.pos()) {
|
||||
Ok(buf) => {
|
||||
let val = Value::des(&buf);
|
||||
let val = if let Some(decompressor) = &mut decompressor {
|
||||
let decompressed = decompressor
|
||||
.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)
|
||||
.unwrap();
|
||||
Value::des(&decompressed)
|
||||
} else {
|
||||
Value::des(&buf)
|
||||
};
|
||||
match val {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len()).unwrap();
|
||||
@@ -487,6 +520,13 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
inner.dictionary = cursor.read_blob(DICTIONARY_OFFSET)?;
|
||||
inner.prepared_dictionary = if inner.dictionary.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(zstd::dict::DecoderDictionary::copy(&inner.dictionary))
|
||||
};
|
||||
inner.index_start_blk = actual_summary.index_start_blk;
|
||||
inner.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
@@ -512,6 +552,8 @@ impl DeltaLayer {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: Vec::new(),
|
||||
prepared_dictionary: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
@@ -539,6 +581,8 @@ impl DeltaLayer {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: Vec::new(),
|
||||
prepared_dictionary: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
@@ -586,6 +630,7 @@ pub struct DeltaLayerWriter {
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
|
||||
compressor: Option<zstd::bulk::Compressor<'static>>,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriter {
|
||||
@@ -598,6 +643,7 @@ impl DeltaLayerWriter {
|
||||
tenantid: ZTenantId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
dictionary: Vec<u8>,
|
||||
) -> Result<DeltaLayerWriter> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
// the end key yet, so we cannot form the final filename yet. We will
|
||||
@@ -613,14 +659,24 @@ impl DeltaLayerWriter {
|
||||
));
|
||||
let mut file = VirtualFile::create(&path)?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
|
||||
file.seek(SeekFrom::Start(DICTIONARY_OFFSET))?;
|
||||
let buf_writer = BufWriter::new(file);
|
||||
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
|
||||
let mut blob_writer = WriteBlobWriter::new(buf_writer, DICTIONARY_OFFSET);
|
||||
|
||||
let off = blob_writer.write_blob(&dictionary)?;
|
||||
assert!(off == DICTIONARY_OFFSET);
|
||||
let compressor = if dictionary.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(zstd::bulk::Compressor::with_dictionary(
|
||||
config::ZSTD_COMPRESSION_LEVEL,
|
||||
&dictionary,
|
||||
)?)
|
||||
};
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
Ok(DeltaLayerWriter {
|
||||
conf,
|
||||
path,
|
||||
@@ -630,6 +686,7 @@ impl DeltaLayerWriter {
|
||||
lsn_range,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
compressor,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -641,7 +698,13 @@ impl DeltaLayerWriter {
|
||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
assert!(self.lsn_range.start <= lsn);
|
||||
|
||||
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
|
||||
let body = &Value::ser(&val)?;
|
||||
let off = if let Some(ref mut compressor) = self.compressor {
|
||||
let compressed = compressor.compress(body)?;
|
||||
self.blob_writer.write_blob(&compressed)?
|
||||
} else {
|
||||
self.blob_writer.write_blob(body)?
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, val.will_init());
|
||||
|
||||
@@ -698,6 +761,8 @@ impl DeltaLayerWriter {
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
dictionary: Vec::new(),
|
||||
prepared_dictionary: None,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
}),
|
||||
@@ -735,6 +800,7 @@ struct DeltaValueIter<'a> {
|
||||
all_offsets: Vec<(DeltaKey, BlobRef)>,
|
||||
next_idx: usize,
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
decompressor: Option<zstd::bulk::Decompressor<'a>>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
@@ -773,11 +839,26 @@ impl<'a> DeltaValueIter<'a> {
|
||||
true
|
||||
},
|
||||
)?;
|
||||
|
||||
let decompressor = match &inner.prepared_dictionary {
|
||||
Some(dictionary) => Some(zstd::bulk::Decompressor::with_prepared_dictionary(
|
||||
dictionary,
|
||||
)?),
|
||||
None => None,
|
||||
};
|
||||
/*
|
||||
let decompressor = if !inner.dictionary.is_empty() {
|
||||
Some(zstd::bulk::Decompressor::with_dictionary(
|
||||
&inner.dictionary,
|
||||
)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
*/
|
||||
let iter = DeltaValueIter {
|
||||
all_offsets,
|
||||
next_idx: 0,
|
||||
reader: BlockCursor::new(Adapter(inner)),
|
||||
decompressor,
|
||||
};
|
||||
|
||||
Ok(iter)
|
||||
@@ -791,7 +872,13 @@ impl<'a> DeltaValueIter<'a> {
|
||||
let lsn = delta_key.lsn();
|
||||
|
||||
let buf = self.reader.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let val = if let Some(decompressor) = &mut self.decompressor {
|
||||
let decompressed =
|
||||
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
|
||||
Value::des(&decompressed)
|
||||
} else {
|
||||
Value::des(&buf)
|
||||
}?;
|
||||
self.next_idx += 1;
|
||||
Ok(Some((key, lsn, val)))
|
||||
} else {
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
//! layer, and offsets to the other parts. The "index" is a B-tree,
|
||||
//! mapping from Key to an offset in the "values" part. The
|
||||
//! actual page images are stored in the "values" part.
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
@@ -168,7 +169,8 @@ impl Layer for ImageLayer {
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
let decompressed = zstd::bulk::decompress(&blob, PAGE_SZ)?;
|
||||
let value = Bytes::from(decompressed);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
@@ -456,7 +458,8 @@ impl ImageLayerWriter {
|
||||
///
|
||||
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let off = self.blob_writer.write_blob(img)?;
|
||||
let compressed = zstd::bulk::compress(img, config::ZSTD_COMPRESSION_LEVEL)?;
|
||||
let off = self.blob_writer.write_blob(&compressed)?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
|
||||
//! its position in the file, is kept in memory, though.
|
||||
//!
|
||||
use crate::config;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::layered_repository::block_io::BlockReader;
|
||||
@@ -318,21 +319,40 @@ impl InMemoryLayer {
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let mut samples: Vec<Vec<u8>> = Vec::with_capacity(config::ZSTD_MAX_SAMPLES);
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
let mut cursor = inner.file.block_cursor();
|
||||
|
||||
// First learn dictionary
|
||||
'train: for (_key, vec_map) in keys.iter() {
|
||||
// Write all page versions
|
||||
for (_lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf)?;
|
||||
samples.push(buf.clone());
|
||||
if samples.len() == config::ZSTD_MAX_SAMPLES {
|
||||
break 'train;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let dictionary = if samples.len() >= config::ZSTD_MIN_SAMPLES {
|
||||
zstd::dict::from_samples(&samples, config::ZSTD_MAX_DICTIONARY_SIZE)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
Key::MIN,
|
||||
self.start_lsn..inner.end_lsn.unwrap(),
|
||||
dictionary,
|
||||
)?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let mut cursor = inner.file.block_cursor();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
for (key, vec_map) in keys.iter() {
|
||||
let key = **key;
|
||||
// Write all page versions
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: d7c8426e49...a13fe64a3e
Reference in New Issue
Block a user