mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
pageserver: ingest pre-serialized batches of values (#9579)
## Problem https://github.com/neondatabase/neon/pull/9524 split the decoding and interpretation step from ingestion. The output of the first phase is a `wal_decoder::models::InterpretedWalRecord`. Before this patch set that struct contained a list of `Value` instances. We wish to lift the decoding and interpretation step to the safekeeper, but it would be nice if the safekeeper gave us a batch containing the raw data instead of actual values. ## Summary of changes Main goal here is to make `InterpretedWalRecord` hold a raw buffer which contains pre-serialized Values. For this we do: 1. Add a `SerializedValueBatch` type. This is `inmemory_layer::SerializedBatch` with some extra functionality for extension, observing values for shard 0 and tests. 2. Replace `inmemory_layer::SerializedBatch` with `SerializedValueBatch` 3. Make `DatadirModification` maintain a `SerializedValueBatch`. ### `DatadirModification` changes `DatadirModification` now maintains a `SerializedValueBatch` and extends it as new WAL records come in (to avoid flushing to disk on every record). In turn, this cascaded into a number of modifications to `DatadirModification`: 1. Replace `pending_data_pages` and `pending_zero_data_pages` with `pending_data_batch`. 2. Removal of `pending_zero_data_pages` and its cousin `on_wal_record_end` 3. Rename `pending_bytes` to `pending_metadata_bytes` since this is what it tracks now. 4. Adapting of various utility methods like `len`, `approx_pending_bytes` and `has_dirty_data_pages`. Removal of `pending_zero_data_pages` and the optimisation associated with it ((1) and (2)) deserves more detail. Previously all zero data pages went through `pending_zero_data_pages`. We wrote zero data pages when filling gaps caused by relation extension (case A) and when handling special wal records (case B). If it happened that the same WAL record contained a non zero write for an entry in `pending_zero_data_pages` we skipped the zero write. Case A: We handle this differently now. When ingesting the `SerialiezdValueBatch` associated with one PG WAL record, we identify the gaps and fill the them in one go. Essentially, we move from a per key process (gaps were filled after each new key), and replace it with a per record process. Hence, the optimisation is not required anymore. Case B: When the handling of a special record needs to zero out a key, it just adds that to the current batch. I inspected the code, and I don't think the optimisation kicked in here.
This commit is contained in:
@@ -2,15 +2,13 @@
|
||||
//! raw bytes which represent a raw Postgres WAL record.
|
||||
|
||||
use crate::models::*;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
@@ -21,11 +19,12 @@ impl InterpretedWalRecord {
|
||||
pub fn from_bytes_filtered(
|
||||
buf: Bytes,
|
||||
shard: &ShardIdentity,
|
||||
lsn: Lsn,
|
||||
record_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<InterpretedWalRecord> {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(buf, &mut decoded, pg_version)?;
|
||||
let xid = decoded.xl_xid;
|
||||
|
||||
let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
|
||||
FlushUncommittedRecords::Yes
|
||||
@@ -33,96 +32,20 @@ impl InterpretedWalRecord {
|
||||
FlushUncommittedRecords::No
|
||||
};
|
||||
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?;
|
||||
|
||||
let mut blocks = Vec::default();
|
||||
for blk in decoded.blocks.iter() {
|
||||
let rel = RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(rel, blk.blkno);
|
||||
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key);
|
||||
}
|
||||
|
||||
let key_is_local = shard.is_key_local(&key);
|
||||
|
||||
tracing::debug!(
|
||||
lsn=%lsn,
|
||||
key=%key,
|
||||
"ingest: shard decision {}",
|
||||
if !key_is_local { "drop" } else { "keep" },
|
||||
);
|
||||
|
||||
if !key_is_local {
|
||||
if shard.is_shard_zero() {
|
||||
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
|
||||
// its blkno in case it implicitly extends a relation.
|
||||
blocks.push((key.to_compact(), None));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Instead of storing full-page-image WAL record,
|
||||
// it is better to store extracted image: we can skip wal-redo
|
||||
// in this case. Also some FPI records may contain multiple (up to 32) pages,
|
||||
// so them have to be copied multiple times.
|
||||
//
|
||||
let value = if blk.apply_image
|
||||
&& blk.has_image
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
// Extract page image from FPI record
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||
// TODO(vlad): skip the copy
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
||||
image.unsplit(tail);
|
||||
}
|
||||
//
|
||||
// Match the logic of XLogReadBufferForRedoExtended:
|
||||
// The page may be uninitialized. If so, we can't set the LSN because
|
||||
// that would corrupt the page.
|
||||
//
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
assert_eq!(image.len(), BLCKSZ as usize);
|
||||
|
||||
Value::Image(image.freeze())
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: decoded.record.clone(),
|
||||
})
|
||||
};
|
||||
|
||||
blocks.push((key.to_compact(), Some(value)));
|
||||
}
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
|
||||
let batch = SerializedValueBatch::from_decoded_filtered(
|
||||
decoded,
|
||||
shard,
|
||||
record_end_lsn,
|
||||
pg_version,
|
||||
)?;
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
blocks,
|
||||
lsn,
|
||||
batch,
|
||||
end_lsn: record_end_lsn,
|
||||
flush_uncommitted,
|
||||
xid: decoded.xl_xid,
|
||||
xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -130,7 +53,7 @@ impl InterpretedWalRecord {
|
||||
impl MetadataRecord {
|
||||
fn from_decoded(
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
record_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Note: this doesn't actually copy the bytes since
|
||||
@@ -151,7 +74,7 @@ impl MetadataRecord {
|
||||
Ok(None)
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
@@ -163,7 +86,7 @@ impl MetadataRecord {
|
||||
//
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
Self::decode_logical_message_record(&mut buf, decoded)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user