From b7e7aeed4d7b69409ef20632748f26527886e437 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 6 Dec 2023 21:57:29 +0200 Subject: [PATCH] Peform compression of page images in storage --- libs/pageserver_api/src/key.rs | 4 ++++ pageserver/src/pgdatadir_mapping.rs | 15 +++++++++++- pageserver/src/repository.rs | 9 +++++++- .../src/tenant/storage_layer/delta_layer.rs | 12 +++++++++- .../src/tenant/storage_layer/image_layer.rs | 23 +++++++++++++++---- .../tenant/storage_layer/inmemory_layer.rs | 11 +++++++++ 6 files changed, 67 insertions(+), 7 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 852670af2c..5b65f97c92 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -577,3 +577,7 @@ mod tests { assert_eq!(key, Key::from_str(&format!("{key}")).unwrap()); } } + +pub fn is_rel_data_key(key: Key) -> bool { + key.field1 == 0x00 && key.field4 != 0 && key.field6 != 0xffffffff +} diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 727650a5a5..aaedfebd68 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -14,6 +14,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; +use lz4_flex; use enum_map::Enum; use itertools::Itertools; use pageserver_api::key::{ @@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> { img: Bytes, ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); - self.put(rel_block_to_key(rel, blknum), Value::Image(img)); + let compressed = lz4_flex::block::compress(&img); + if compressed.len() < img.len() { + self.put( + rel_block_to_key(rel, blknum), + Value::CompressedImage(Bytes::from(compressed)), + ); + } else { + self.put(rel_block_to_key(rel, blknum), Value::Image(img)); + } Ok(()) } @@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> { if let Some((_, value)) = values.last() { return if let Value::Image(img) = value { Ok(img.clone()) + } else if let Value::CompressedImage(img) = value { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize) + .map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?; + Ok(Bytes::from(decompressed)) } else { // Currently, we never need to read back a WAL record that we // inserted in the same "transaction". All the metadata updates diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 9959d105eb..839e0d101e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE}; pub enum Value { /// An Image value contains a full copy of the value Image(Bytes), + /// An compressed page image contains a full copy of the page + CompressedImage(Bytes), /// A WalRecord value contains a WAL record that needs to be /// replayed get the full value. Replaying the WAL record /// might need a previous version of the value (if will_init() @@ -22,12 +24,17 @@ pub enum Value { impl Value { pub fn is_image(&self) -> bool { - matches!(self, Value::Image(_)) + match self { + Value::Image(_) => true, + Value::CompressedImage(_) => true, + Value::WalRecord(_) => false, + } } pub fn will_init(&self) -> bool { match self { Value::Image(_) => true, + Value::CompressedImage(_) => true, Value::WalRecord(rec) => rec.will_init(), } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b7132ee3bf..cc77f01219 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -813,6 +814,12 @@ impl DeltaLayerInner { need_image = false; break; } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed))); + need_image = false; + break; + } Value::WalRecord(rec) => { let will_init = rec.will_init(); reconstruct_state.records.push((entry_lsn, rec)); @@ -1102,6 +1109,9 @@ impl DeltaLayerInner { Value::Image(img) => { format!(" img {} bytes", img.len()) } + Value::CompressedImage(img) => { + format!(" compressed img {} bytes", img.len()) + } Value::WalRecord(rec) => { let wal_desc = walrecord::describe_wal_record(&rec)?; format!( diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 14c79e413c..67dcbef798 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -26,6 +26,7 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::{self, FileId, PAGE_SZ}; +use crate::pgdatadir_mapping::is_rel_data_key; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; @@ -45,8 +46,10 @@ use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; use pageserver_api::keyspace::KeySpace; +use lz4_flex; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -446,8 +449,12 @@ impl ImageLayerInner { ) .await .with_context(|| format!("failed to read value from offset {}", offset))?; - let value = Bytes::from(blob); - + let value = if is_rel_data_key(key) && blob.len() < BLCKSZ as usize { + let decompressed = lz4_flex::block::decompress(&blob, BLCKSZ as usize)?; + Bytes::from(decompressed) + } else { + Bytes::from(blob) + }; reconstruct_state.img = Some((self.lsn, value)); Ok(ValueReconstructResult::Complete) } else { @@ -658,10 +665,18 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img).await; + let (_img, res) = if is_rel_data_key(key) { + let compressed = lz4_flex::block::compress(img); + if compressed.len() < img.len() { + self.blob_writer.write_blob(&compressed).await; + } else { + self.blob_writer.write_blob(img).await; + } + } else { + self.blob_writer.write_blob(img).await; + }; // TODO: re-use the buffer for `img` further upstack let off = res?; - let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5f1db21d49..d1cea61bb4 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError; use crate::tenant::{PageReconstructError, Timeline}; use crate::walrecord; use anyhow::{anyhow, ensure, Result}; +use bytes::Bytes; +use lz4_flex; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; +use postgres_ffi::BLCKSZ; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, OnceLock}; use tracing::*; @@ -133,6 +136,9 @@ impl InMemoryLayer { Ok(Value::Image(img)) => { write!(&mut desc, " img {} bytes", img.len())?; } + Ok(Value::CompressedImage(img)) => { + write!(&mut desc, " compressed img {} bytes", img.len())?; + } Ok(Value::WalRecord(rec)) => { let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); write!( @@ -184,6 +190,11 @@ impl InMemoryLayer { reconstruct_state.img = Some((*entry_lsn, img)); return Ok(ValueReconstructResult::Complete); } + Value::CompressedImage(img) => { + let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?; + reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed))); + return Ok(ValueReconstructResult::Complete); + } Value::WalRecord(rec) => { let will_init = rec.will_init(); reconstruct_state.records.push((*entry_lsn, rec));