From 42d0b040f8885f62c420023c6158d1913c719929 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 11 Mar 2024 17:34:05 +0200 Subject: [PATCH] Fix merge conflicts --- libs/pageserver_api/src/models.rs | 1 + pageserver/client/src/page_service.rs | 1 + pageserver/src/pgdatadir_mapping.rs | 2 +- pageserver/src/tenant/blob_io.rs | 70 ++++++++++++------- pageserver/src/tenant/storage_layer.rs | 25 ++++++- .../src/tenant/storage_layer/delta_layer.rs | 5 ++ .../src/tenant/storage_layer/image_layer.rs | 22 +++--- pageserver/src/walingest.rs | 9 ++- 8 files changed, 89 insertions(+), 46 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 275c20d19b..4aa5c6d0f5 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1085,6 +1085,7 @@ impl PagestreamBeMessage { Self::Error(_) => "Error", Self::DbSize(_) => "DbSize", Self::GetSlruSegment(_) => "GetSlruSegment", + Self::GetCompressedPage(_) => "GetCompressedPage", } } } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 49175b3b90..1b5f1d9df0 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -157,6 +157,7 @@ impl PagestreamClient { PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::DbSize(_) + | PagestreamBeMessage::GetCompressedPage(_) | PagestreamBeMessage::GetSlruSegment(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index aaedfebd68..2bb9ae19c1 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -14,9 +14,9 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; -use lz4_flex; use enum_map::Enum; use itertools::Itertools; +use lz4_flex; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 970eea511a..4ba68bac1f 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,7 +11,7 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; @@ -237,39 +237,56 @@ impl BlobWriter { (src_buf, Ok(())) } - pub async fn write_compressed_blob(&mut self, srcbuf: &[u8]) -> Result { + pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result { let offset = self.offset; - if srcbuf.len() < 128 { - self.write_all(&[NO_COMPRESSION]).await?; + + let len = srcbuf.len(); + + let mut io_buf = self.io_buf.take().expect("we always put it back below"); + io_buf.clear(); + let mut is_compressed = false; + if len < 128 { // Short blob. Write a 1-byte length header - let len_buf = srcbuf.len() as u8; - self.write_all(&[len_buf]).await?; + io_buf.put_u8(NO_COMPRESSION); + io_buf.put_u8(len as u8); } else { // Write a 4-byte length header - if srcbuf.len() == BLCKSZ as usize { - let compressed = lz4_flex::block::compress(srcbuf); - if compressed.len() < srcbuf.len() { - self.write_all(&[LZ4_COMPRESSION]).await?; - let mut len_buf = (compressed.len() as u32).to_be_bytes(); - len_buf[0] |= 0x80; - self.write_all(&len_buf).await?; - self.write_all(&compressed).await?; - return Ok(offset); - } - } - if srcbuf.len() > 0x7fff_ffff { + if len > 0x7fff_ffff { return Err(Error::new( ErrorKind::Other, - format!("blob too large ({} bytes)", srcbuf.len()), + format!("blob too large ({} bytes)", len), )); } - self.write_all(&[NO_COMPRESSION]).await?; - let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes(); - len_buf[0] |= 0x80; - self.write_all(&len_buf).await?; + if len == BLCKSZ as usize { + let compressed = lz4_flex::block::compress(&srcbuf); + if compressed.len() < len { + io_buf.put_u8(LZ4_COMPRESSION); + let mut len_buf = (compressed.len() as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + io_buf.extend_from_slice(&compressed[..]); + is_compressed = true; + } + if is_compressed { + io_buf.put_u8(NO_COMPRESSION); + let mut len_buf = (len as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + } + } + } + let (io_buf, hdr_res) = self.write_all(io_buf).await; + match hdr_res { + Ok(_) => (), + Err(e) => return Err(e), + } + self.io_buf = Some(io_buf); + if is_compressed { + hdr_res.map(|_| offset) + } else { + let (_buf, res) = self.write_all(srcbuf).await; + res.map(|_| offset) } - self.write_all(srcbuf).await?; - Ok(offset) } /// Write a blob of data. Returns the offset that it was written to, @@ -288,7 +305,6 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf).await } else { // Write a 4-byte length header if len > 0x7fff_ffff { @@ -303,8 +319,8 @@ impl BlobWriter { let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf).await } + self.write_all(io_buf).await } .await; self.io_buf = Some(io_buf); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 299950cc21..c944fe5cfd 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use postgres_ffi::BLCKSZ; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; @@ -147,12 +148,13 @@ impl ValuesReconstructState { lsn: Lsn, value: Value, ) -> ValueReconstructSituation { - let state = self + let mut error: Option = None; + let key_state = self .keys .entry(*key) .or_insert(Ok(VectoredValueReconstructState::default())); - if let Ok(state) = state { + let situation = if let Ok(state) = key_state { let key_done = match state.situation { ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Continue => match value { @@ -160,6 +162,21 @@ impl ValuesReconstructState { state.img = Some((lsn, img)); true } + Value::CompressedImage(img) => { + match lz4_flex::block::decompress(&img, BLCKSZ as usize) { + Ok(decompressed) => { + state.img = Some((lsn, Bytes::from(decompressed))); + true + } + Err(e) => { + error = Some(PageReconstructError::from(anyhow::anyhow!( + "Failed to decompress blobrom virtual file: {}", + e + ))); + true + } + } + } Value::WalRecord(rec) => { let reached_cache = state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn); @@ -178,7 +195,11 @@ impl ValuesReconstructState { state.situation } else { ValueReconstructSituation::Complete + }; + if let Some(err) = error { + *key_state = Err(err); } + situation } /// Returns the Lsn at which this key is cached if one exists. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index cc77f01219..bd0d92e553 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1148,6 +1148,11 @@ impl DeltaLayerInner { let checkpoint = CheckPoint::decode(&img)?; println!(" CHECKPOINT: {:?}", checkpoint); } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + let checkpoint = CheckPoint::decode(&decompressed)?; + println!(" CHECKPOINT: {:?}", checkpoint); + } Value::WalRecord(_rec) => { println!(" unexpected walrecord value for checkpoint key"); } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 883caac47e..0ca356646a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -26,7 +26,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; -use crate::pgdatadir_mapping::is_rel_data_key; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; @@ -37,21 +36,17 @@ use crate::tenant::storage_layer::{ use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, -use crate::tenant::Timeline; -use crate::virtual_file::VirtualFile; -use crate::{ - COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; -use crate::{IMAGE_FILE_MAGIC, LZ4_COMPRESSION, NO_COMPRESSION, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; +use crate::{ + COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX, +}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; use pageserver_api::keyspace::KeySpace; -use lz4_flex; -use pageserver_api::key::is_rel_data_key; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; @@ -450,9 +445,12 @@ impl ImageLayerInner { .page_content_kind(PageContentKind::ImageLayerValue) .build(); let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION { - file.block_cursor().read_compressed_blob(offset, &ctx).await + block_reader + .block_cursor() + .read_compressed_blob(offset, &ctx) + .await } else { - file.block_cursor().read_blob(offset, &ctx).await + block_reader.block_cursor().read_blob(offset, &ctx).await }) .with_context(|| format!("failed to read value from offset {}", offset))?; @@ -667,9 +665,7 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_compressed_blob(img).await?; - // TODO: re-use the buffer for `img` further upstack - let off = res?; + let off = self.blob_writer.write_compressed_blob(img).await?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 2e5c6e72ec..69d9522b1c 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -472,8 +472,8 @@ impl WalIngest { && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) // only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record - && (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.timeline.pg_version)? || - postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.timeline.pg_version)?) + && (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? || + postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?) // do not materialize null pages because them most likely be soon replaced with real data && blk.bimg_len != 0 { @@ -481,7 +481,10 @@ impl WalIngest { let img_len = blk.bimg_len as usize; let img_offs = blk.bimg_offset as usize; let mut image = BytesMut::with_capacity(BLCKSZ as usize); - if postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, self.timeline.pg_version)? { + if postgres_ffi::bkpimage_is_compressed_lz4( + blk.bimg_info, + modification.tline.pg_version, + )? { let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize; let decompressed = lz4_flex::block::decompress( &decoded.record[img_offs..img_offs + img_len],