Support vectored comp[ressed blobs read

This commit is contained in:
Konstantin Knizhnik
2024-03-12 10:32:49 +02:00
parent 8c60359ae5
commit 98375b3896
4 changed files with 63 additions and 22 deletions

View File

@@ -237,7 +237,11 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(()))
}
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes, compress: bool) -> Result<u64, Error> {
pub async fn write_compressed_blob(
&mut self,
srcbuf: Bytes,
compress: bool,
) -> Result<u64, Error> {
let offset = self.offset;
let len = srcbuf.len();

View File

@@ -966,7 +966,7 @@ impl DeltaLayerInner {
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.read_blobs(&read, buf.take().expect("Should have a buffer"), false)
.await;
let blobs_buf = match res {

View File

@@ -40,7 +40,8 @@ use crate::tenant::vectored_blob_io::{
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, LZ4_COMPRESSION, STORAGE_FORMAT_VERSION,
TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
@@ -49,6 +50,7 @@ use hex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
@@ -546,9 +548,12 @@ impl ImageLayerInner {
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let compressed = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
for read in reads.into_iter() {
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
let res = vectored_blob_reader.read_blobs(&read, buf).await;
let res = vectored_blob_reader
.read_blobs(&read, buf, compressed)
.await;
match res {
Ok(blobs_buf) => {
@@ -556,11 +561,31 @@ impl ImageLayerInner {
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
if meta.compression_alg == LZ4_COMPRESSION {
match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
Ok(decompressed) => {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(Bytes::from(decompressed)),
);
}
Err(err) => reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(
"Failed to decompress blob from file {}: {}",
self.file.path,
err
)),
),
}
} else {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
}
}
}
Err(err) => {
@@ -668,7 +693,10 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_compressed_blob(img, self.compression).await?;
let off = self
.blob_writer
.write_compressed_blob(img, self.compression)
.await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;

View File

@@ -15,11 +15,11 @@
//!
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use crate::NO_COMPRESSION;
use bytes::BytesMut;
use pageserver_api::key::Key;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
@@ -40,6 +40,7 @@ pub struct VectoredBlob {
pub start: usize,
pub end: usize,
pub meta: BlobMeta,
pub compression_alg: u8,
}
/// Return type of [`VectoredBlobReader::read_blobs`]
@@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
&self,
read: &VectoredRead,
buf: BytesMut,
new_storage_format: bool,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
assert!(
@@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> {
);
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
let mut offset_in_buf = (offset - start_offset) as usize;
let compression_alg = if new_storage_format {
offset_in_buf += 1;
buf[offset_in_buf - 1]
} else {
NO_COMPRESSION
};
let first_len_byte = buf[offset_in_buf];
// Each blob is prefixed by a header containing it's size.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
(1usize, first_len_byte as usize)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
(4usize, u32::from_be_bytes(blob_size_buf) as usize)
};
let start = offset_in_buf + size_length;
let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
Some((next_blob_start_offset, _)) => {
(next_blob_start_offset - start_offset) as usize
}
None => start + blob_size,
};
assert_eq!(end - start, blob_size);
metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
start,
end,
compression_alg,
meta: *meta,
})
}