Eliminate limit on buffer size.

This relies on the zstd 'experimental' feature, because the
zstd::bulk::Decompressor::upper_bound() function, which uses the
function ZSTD_decompressBound() function, is still experimental in the
zstd library. I'm OK with that, it's unlikely that the function would
change, and if it does, I'm sure there will be a replacement. There's
also the zstd_safe::get_decompressed_size() function that we could
use, but we only need an upper-bound, not the exact size, so
upper_bound() seems more appropriate.
This commit is contained in:
Heikki Linnakangas
2022-05-10 14:14:03 +03:00
parent e90b83646c
commit 0ed0433e82
4 changed files with 28 additions and 14 deletions

View File

@@ -58,7 +58,8 @@ rusoto_core = "0.47"
rusoto_s3 = "0.47"
async-trait = "0.1"
zstd = "0.11.1"
# 'experimental' is needed for the `zstd::bulk::Decompressor::upper_bound` function.
zstd = { version = "0.11.1", features = ["experimental"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -24,7 +24,6 @@ pub const ZSTD_MAX_SAMPLES: usize = 1024;
pub const ZSTD_MIN_SAMPLES: usize = 8; // magic requirement of zstd
pub const ZSTD_MAX_DICTIONARY_SIZE: usize = 8 * 1024;
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0; // default compression level
pub const ZSTD_DECOMPRESS_BUFFER_LIMIT: usize = 64 * 1024; // TODO: handle larger WAL records?
pub mod defaults {
use crate::tenant_config::defaults::*;

View File

@@ -73,7 +73,7 @@ pub struct BlockCursor<R>
where
R: BlockReader,
{
pub reader: R,
reader: R,
/// last accessed page
cache: Option<(u32, R::BlockLease)>,
}

View File

@@ -37,9 +37,8 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use serde::{Deserialize, Serialize};
use tracing::*;
use std::fs;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
@@ -47,6 +46,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -242,6 +242,7 @@ impl Layer for DeltaLayer {
)?),
None => None,
};
let mut decompress_buf = Vec::new();
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
@@ -275,9 +276,12 @@ impl Layer for DeltaLayer {
)
})?;
let val = if let Some(decompressor) = &mut decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
decompress_buf.clear();
decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf)?;
Value::des(&decompress_buf)
} else {
Value::des(&buf)
}
@@ -380,14 +384,18 @@ impl Layer for DeltaLayer {
)?),
None => None,
};
let mut decompress_buf = Vec::new();
// A subroutine to dump a single blob
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
let val = if let Some(decompressor) = &mut decompressor {
let decompressed = decompressor
.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
decompress_buf.clear();
decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut decompress_buf);
Value::des(&decompress_buf)
} else {
Value::des(&buf)
}?;
@@ -794,6 +802,8 @@ struct DeltaValueIter<'a> {
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
decompressor: Option<zstd::bulk::Decompressor<'a>>,
decompress_buf: Vec<u8>,
}
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
@@ -844,6 +854,7 @@ impl<'a> DeltaValueIter<'a> {
next_idx: 0,
reader: BlockCursor::new(Adapter(inner)),
decompressor,
decompress_buf: Vec::new(),
};
Ok(iter)
@@ -858,9 +869,12 @@ impl<'a> DeltaValueIter<'a> {
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = if let Some(decompressor) = &mut self.decompressor {
let decompressed =
decompressor.decompress(&buf, config::ZSTD_DECOMPRESS_BUFFER_LIMIT)?;
Value::des(&decompressed)
let decompressed_max_len = zstd::bulk::Decompressor::upper_bound(&buf)
.ok_or_else(|| anyhow!("could not get decompressed length"))?;
self.decompress_buf.clear();
self.decompress_buf.reserve(decompressed_max_len);
let _ = decompressor.decompress_to_buffer(&buf, &mut self.decompress_buf)?;
Value::des(&self.decompress_buf)
} else {
Value::des(&buf)
}?;