mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Peform compression of page images in storage
This commit is contained in:
@@ -577,3 +577,7 @@ mod tests {
|
||||
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_rel_data_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use lz4_flex;
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
let compressed = lz4_flex::block::compress(&img);
|
||||
if compressed.len() < img.len() {
|
||||
self.put(
|
||||
rel_block_to_key(rel, blknum),
|
||||
Value::CompressedImage(Bytes::from(compressed)),
|
||||
);
|
||||
} else {
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
|
||||
if let Some((_, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else if let Value::CompressedImage(img) = value {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
|
||||
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||
Ok(Bytes::from(decompressed))
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
|
||||
@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||
pub enum Value {
|
||||
/// An Image value contains a full copy of the value
|
||||
Image(Bytes),
|
||||
/// An compressed page image contains a full copy of the page
|
||||
CompressedImage(Bytes),
|
||||
/// A WalRecord value contains a WAL record that needs to be
|
||||
/// replayed get the full value. Replaying the WAL record
|
||||
/// might need a previous version of the value (if will_init()
|
||||
@@ -22,12 +24,17 @@ pub enum Value {
|
||||
|
||||
impl Value {
|
||||
pub fn is_image(&self) -> bool {
|
||||
matches!(self, Value::Image(_))
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn will_init(&self) -> bool {
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(rec) => rec.will_init(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
format!(" compressed img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache::{self, FileId, PAGE_SZ};
|
||||
use crate::pgdatadir_mapping::is_rel_data_key;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
@@ -45,8 +46,10 @@ use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use lz4_flex;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -446,8 +449,12 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
let value = if is_rel_data_key(key) && blob.len() < BLCKSZ as usize {
|
||||
let decompressed = lz4_flex::block::decompress(&blob, BLCKSZ as usize)?;
|
||||
Bytes::from(decompressed)
|
||||
} else {
|
||||
Bytes::from(blob)
|
||||
};
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -658,10 +665,18 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
let (_img, res) = if is_rel_data_key(key) {
|
||||
let compressed = lz4_flex::block::compress(img);
|
||||
if compressed.len() < img.len() {
|
||||
self.blob_writer.write_blob(&compressed).await;
|
||||
} else {
|
||||
self.blob_writer.write_blob(img).await;
|
||||
}
|
||||
} else {
|
||||
self.blob_writer.write_blob(img).await;
|
||||
};
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::walrecord;
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use lz4_flex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
@@ -133,6 +136,9 @@ impl InMemoryLayer {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::CompressedImage(img)) => {
|
||||
write!(&mut desc, " compressed img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::WalRecord(rec)) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||
write!(
|
||||
@@ -184,6 +190,11 @@ impl InMemoryLayer {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
|
||||
Reference in New Issue
Block a user