Fix merge conflicts

This commit is contained in:
Konstantin Knizhnik
2024-03-11 17:34:05 +02:00
parent 9832638c09
commit 42d0b040f8
8 changed files with 89 additions and 46 deletions

View File

@@ -1085,6 +1085,7 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetCompressedPage(_) => "GetCompressedPage",
}
}
}

View File

@@ -157,6 +157,7 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetCompressedPage(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",

View File

@@ -14,9 +14,9 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use lz4_flex;
use enum_map::Enum;
use itertools::Itertools;
use lz4_flex;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,

View File

@@ -11,7 +11,7 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
@@ -237,39 +237,56 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(()))
}
pub async fn write_compressed_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
let offset = self.offset;
if srcbuf.len() < 128 {
self.write_all(&[NO_COMPRESSION]).await?;
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut is_compressed = false;
if len < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.write_all(&[len_buf]).await?;
io_buf.put_u8(NO_COMPRESSION);
io_buf.put_u8(len as u8);
} else {
// Write a 4-byte length header
if srcbuf.len() == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(srcbuf);
if compressed.len() < srcbuf.len() {
self.write_all(&[LZ4_COMPRESSION]).await?;
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.write_all(&len_buf).await?;
self.write_all(&compressed).await?;
return Ok(offset);
}
}
if srcbuf.len() > 0x7fff_ffff {
if len > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()),
format!("blob too large ({} bytes)", len),
));
}
self.write_all(&[NO_COMPRESSION]).await?;
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.write_all(&len_buf).await?;
if len == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(&srcbuf);
if compressed.len() < len {
io_buf.put_u8(LZ4_COMPRESSION);
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
io_buf.extend_from_slice(&compressed[..]);
is_compressed = true;
}
if is_compressed {
io_buf.put_u8(NO_COMPRESSION);
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
}
}
}
let (io_buf, hdr_res) = self.write_all(io_buf).await;
match hdr_res {
Ok(_) => (),
Err(e) => return Err(e),
}
self.io_buf = Some(io_buf);
if is_compressed {
hdr_res.map(|_| offset)
} else {
let (_buf, res) = self.write_all(srcbuf).await;
res.map(|_| offset)
}
self.write_all(srcbuf).await?;
Ok(offset)
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -288,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
@@ -303,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
}
self.write_all(io_buf).await
}
.await;
self.io_buf = Some(io_buf);

View File

@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use postgres_ffi::BLCKSZ;
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
let mut error: Option<PageReconstructError> = None;
let key_state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let situation = if let Ok(state) = key_state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
state.img = Some((lsn, img));
true
}
Value::CompressedImage(img) => {
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
Ok(decompressed) => {
state.img = Some((lsn, Bytes::from(decompressed)));
true
}
Err(e) => {
error = Some(PageReconstructError::from(anyhow::anyhow!(
"Failed to decompress blobrom virtual file: {}",
e
)));
true
}
}
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
state.situation
} else {
ValueReconstructSituation::Complete
};
if let Some(err) = error {
*key_state = Err(err);
}
situation
}
/// Returns the Lsn at which this key is cached if one exists.

View File

@@ -1148,6 +1148,11 @@ impl DeltaLayerInner {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
let checkpoint = CheckPoint::decode(&decompressed)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::WalRecord(_rec) => {
println!(" unexpected walrecord value for checkpoint key");
}

View File

@@ -26,7 +26,6 @@
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::{self, FileId, PAGE_SZ};
use crate::pgdatadir_mapping::is_rel_data_key;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -37,21 +36,17 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, LZ4_COMPRESSION, NO_COMPRESSION, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::KeySpace;
use lz4_flex;
use pageserver_api::key::is_rel_data_key;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -450,9 +445,12 @@ impl ImageLayerInner {
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
file.block_cursor().read_compressed_blob(offset, &ctx).await
block_reader
.block_cursor()
.read_compressed_blob(offset, &ctx)
.await
} else {
file.block_cursor().read_blob(offset, &ctx).await
block_reader.block_cursor().read_blob(offset, &ctx).await
})
.with_context(|| format!("failed to read value from offset {}", offset))?;
@@ -667,9 +665,7 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_compressed_blob(img).await?;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
let off = self.blob_writer.write_compressed_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;

View File

@@ -472,8 +472,8 @@ impl WalIngest {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.timeline.pg_version)? ||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.timeline.pg_version)?)
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -481,7 +481,10 @@ impl WalIngest {
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
if postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, self.timeline.pg_version)? {
if postgres_ffi::bkpimage_is_compressed_lz4(
blk.bimg_info,
modification.tline.pg_version,
)? {
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
let decompressed = lz4_flex::block::decompress(
&decoded.record[img_offs..img_offs + img_len],