safekeeper: decode and interpret for multiple shards in one go (#10201)

## Problem

Currently, we call `InterpretedWalRecord::from_bytes_filtered`
from each shard. To serve multiple shards at the same time,
the API needs to allow for enquiring about multiple shards.

## Summary of changes

This commit tweaks it a pretty brute force way. Naively, we could
just generate the shard for a key, but pre and post split shards
may be subscribed at the same time, so doing it efficiently is more
complex.
This commit is contained in:
Vlad Lazar
2025-01-15 11:10:24 +00:00
committed by GitHub
parent 05d17a10ae
commit 1577430408
13 changed files with 574 additions and 189 deletions

9
Cargo.lock generated
View File

@@ -7560,12 +7560,21 @@ dependencies = [
"anyhow",
"async-compression",
"bytes",
"camino",
"camino-tempfile",
"criterion",
"futures",
"pageserver_api",
"postgres_ffi",
"pprof",
"prost",
"remote_storage",
"serde",
"serde_json",
"thiserror",
"tikv-jemallocator",
"tokio",
"tokio-util",
"tonic",
"tonic-build",
"tracing",

View File

@@ -31,6 +31,8 @@
//! - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
//! and their slugs are 0004, 0104, 0204, and 0304.
use std::hash::{Hash, Hasher};
use crate::{key::Key, models::ShardParameters};
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
@@ -48,6 +50,23 @@ pub struct ShardIdentity {
layout: ShardLayout,
}
/// Hash implementation
///
/// The stripe size cannot change dynamically, so it can be ignored for efficiency reasons.
impl Hash for ShardIdentity {
fn hash<H: Hasher>(&self, state: &mut H) {
let ShardIdentity {
number,
count,
stripe_size: _,
layout: _,
} = self;
number.0.hash(state);
count.0.hash(state);
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
@@ -59,7 +78,7 @@ impl Default for ShardStripeSize {
}
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);
const LAYOUT_V1: ShardLayout = ShardLayout(1);

View File

@@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
pub mid: MultiXactId,
/* new MultiXact's ID */
@@ -46,7 +46,7 @@ impl XlMultiXactCreate {
}
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactTruncate {
pub oldest_multi_db: Oid,
/* to-be-truncated range of multixact offsets */
@@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
}
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlRelmapUpdate {
pub dbid: Oid, /* database ID, or 0 for shared map */
pub tsid: Oid, /* database's tablespace, or pg_global */
@@ -90,7 +90,7 @@ impl XlRelmapUpdate {
}
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlReploriginDrop {
pub node_id: RepOriginId,
}
@@ -104,7 +104,7 @@ impl XlReploriginDrop {
}
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlReploriginSet {
pub remote_lsn: Lsn,
pub node_id: RepOriginId,
@@ -911,7 +911,7 @@ impl XlSmgrCreate {
}
#[repr(C)]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
@@ -984,7 +984,7 @@ impl XlDropDatabase {
/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
/// struct for commits and aborts.
///
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlXactParsedRecord {
pub xid: TransactionId,
pub info: u8,

View File

@@ -24,3 +24,18 @@ workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[build-dependencies]
tonic-build.workspace = true
[dev-dependencies]
criterion.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
remote_storage.workspace = true
tokio-util.workspace = true
serde_json.workspace = true
futures.workspace = true
tikv-jemallocator.workspace = true
pprof.workspace = true
[[bench]]
name = "bench_interpret_wal"
harness = false

View File

@@ -0,0 +1,34 @@
## WAL Decoding and Interpretation Benchmarks
Note that these benchmarks pull WAL from a public bucket in S3
as a preparation step. Hence, you need a way to auth with AWS.
You can achieve this by copying the `~/.aws/config` file from
the AWS SSO notion page and exporting `AWS_PROFILE=dev` when invoking
the benchmarks.
To run benchmarks:
```sh
aws sso login --profile dev
# All benchmarks.
AWS_PROFILE=dev cargo bench --package wal_decoder
# Specific file.
AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal
# Specific benchmark.
AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal unsharded
# List available benchmarks.
cargo bench --package wal_decoder --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal unsharded -- --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.
Benchmarks are automatically compared against the previous run. To compare against other runs, see
`--baseline` and `--save-baseline`.

View File

@@ -0,0 +1,250 @@
use anyhow::Context;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream::FuturesUnordered, StreamExt};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::{waldecoder::WalStreamDecoder, MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use serde::Deserialize;
use std::{env, num::NonZeroUsize, sync::Arc};
use camino::{Utf8Path, Utf8PathBuf};
use camino_tempfile::Utf8TempDir;
use remote_storage::{
DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind,
S3Config,
};
use tokio_util::sync::CancellationToken;
use utils::{
lsn::Lsn,
shard::{ShardCount, ShardNumber},
};
use wal_decoder::models::InterpretedWalRecord;
const S3_BUCKET: &str = "neon-github-public-dev";
const S3_REGION: &str = "eu-central-1";
const BUCKET_PREFIX: &str = "wal-snapshots/bulk-insert/";
const METADATA_FILENAME: &str = "metadata.json";
/// Use jemalloc, and configure it to sample allocations for profiles every 1 MB.
/// This mirrors the configuration in bin/safekeeper.rs.
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
async fn create_s3_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: S3_BUCKET.to_string(),
bucket_region: S3_REGION.to_string(),
prefix_in_bucket: Some(BUCKET_PREFIX.to_string()),
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: None,
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}
async fn download_bench_data(
client: Arc<GenericRemoteStorage>,
cancel: &CancellationToken,
) -> anyhow::Result<Utf8TempDir> {
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into()?;
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent)?;
eprintln!("Downloading benchmark data to {:?}", temp_dir);
let listing = client
.list(None, ListingMode::NoDelimiter, None, cancel)
.await?;
let mut downloads = listing
.keys
.into_iter()
.map(|obj| {
let client = client.clone();
let temp_dir_path = temp_dir.path().to_owned();
async move {
let remote_path = obj.key;
let download = client
.download(&remote_path, &DownloadOpts::default(), cancel)
.await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let file_name = remote_path.object_name().unwrap();
let file_path = temp_dir_path.join(file_name);
let file = tokio::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&file_path)
.await?;
let mut writer = tokio::io::BufWriter::new(file);
tokio::io::copy_buf(&mut body, &mut writer).await?;
Ok::<(), anyhow::Error>(())
}
})
.collect::<FuturesUnordered<_>>();
while let Some(download) = downloads.next().await {
download?;
}
Ok(temp_dir)
}
struct BenchmarkData {
wal: Vec<u8>,
meta: BenchmarkMetadata,
}
#[derive(Deserialize)]
struct BenchmarkMetadata {
pg_version: u32,
start_lsn: Lsn,
}
async fn load_bench_data(path: &Utf8Path, input_size: usize) -> anyhow::Result<BenchmarkData> {
eprintln!("Loading benchmark data from {:?}", path);
let mut entries = tokio::fs::read_dir(path).await?;
let mut ordered_segment_paths = Vec::new();
let mut metadata = None;
while let Some(entry) = entries.next_entry().await? {
if entry.file_name() == METADATA_FILENAME {
let bytes = tokio::fs::read(entry.path()).await?;
metadata = Some(
serde_json::from_slice::<BenchmarkMetadata>(&bytes)
.context("failed to deserialize metadata.json")?,
);
} else {
ordered_segment_paths.push(entry.path());
}
}
ordered_segment_paths.sort();
let mut buffer = Vec::new();
for path in ordered_segment_paths {
if buffer.len() >= input_size {
break;
}
use async_compression::tokio::bufread::ZstdDecoder;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let decoder = ZstdDecoder::new(reader);
let mut reader = tokio::io::BufReader::new(decoder);
tokio::io::copy_buf(&mut reader, &mut buffer).await?;
}
buffer.truncate(input_size);
Ok(BenchmarkData {
wal: buffer,
meta: metadata.unwrap(),
})
}
fn criterion_benchmark(c: &mut Criterion) {
const INPUT_SIZE: usize = 128 * 1024 * 1024;
let setup_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let (_temp_dir, bench_data) = setup_runtime.block_on(async move {
let cancel = CancellationToken::new();
let client = create_s3_client().await.unwrap();
let temp_dir = download_bench_data(client, &cancel).await.unwrap();
let bench_data = load_bench_data(temp_dir.path(), INPUT_SIZE).await.unwrap();
(temp_dir, bench_data)
});
eprintln!(
"Benchmarking against {} MiB of WAL",
INPUT_SIZE / 1024 / 1024
);
let mut group = c.benchmark_group("decode-interpret-wal");
group.throughput(criterion::Throughput::Bytes(bench_data.wal.len() as u64));
group.sample_size(10);
group.bench_function("unsharded", |b| {
b.iter(|| decode_interpret_main(&bench_data, &[ShardIdentity::unsharded()]))
});
let eight_shards = (0..8)
.map(|i| ShardIdentity::new(ShardNumber(i), ShardCount(8), ShardStripeSize(8)).unwrap())
.collect::<Vec<_>>();
group.bench_function("8/8-shards", |b| {
b.iter(|| decode_interpret_main(&bench_data, &eight_shards))
});
let four_shards = eight_shards
.into_iter()
.filter(|s| s.number.0 % 2 == 0)
.collect::<Vec<_>>();
group.bench_function("4/8-shards", |b| {
b.iter(|| decode_interpret_main(&bench_data, &four_shards))
});
let two_shards = four_shards
.into_iter()
.filter(|s| s.number.0 % 4 == 0)
.collect::<Vec<_>>();
group.bench_function("2/8-shards", |b| {
b.iter(|| decode_interpret_main(&bench_data, &two_shards))
});
}
fn decode_interpret_main(bench: &BenchmarkData, shards: &[ShardIdentity]) {
let r = decode_interpret(bench, shards);
if let Err(e) = r {
panic!("{e:?}");
}
}
fn decode_interpret(bench: &BenchmarkData, shard: &[ShardIdentity]) -> anyhow::Result<()> {
let mut decoder = WalStreamDecoder::new(bench.meta.start_lsn, bench.meta.pg_version);
let xlogoff: usize = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
for chunk in bench.wal[xlogoff..].chunks(MAX_SEND_SIZE) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
assert!(lsn.is_aligned());
let _ = InterpretedWalRecord::from_bytes_filtered(
recdata,
shard,
lsn,
bench.meta.pg_version,
)
.unwrap();
}
}
Ok(())
}
criterion_group!(
name=benches;
config=Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets=criterion_benchmark
);
criterion_main!(benches);

View File

@@ -1,6 +1,8 @@
//! This module contains logic for decoding and interpreting
//! raw bytes which represent a raw Postgres WAL record.
use std::collections::HashMap;
use crate::models::*;
use crate::serialized_batch::SerializedValueBatch;
use bytes::{Buf, Bytes};
@@ -14,15 +16,15 @@ use utils::lsn::Lsn;
impl InterpretedWalRecord {
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
/// Data blocks which do not match the provided shard identity are filtered out.
/// Data blocks which do not match any of the provided shard identities are filtered out.
/// Shard 0 is a special case since it tracks all relation sizes. We only give it
/// the keys that are being written as that is enough for updating relation sizes.
pub fn from_bytes_filtered(
buf: Bytes,
shard: &ShardIdentity,
shards: &[ShardIdentity],
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<InterpretedWalRecord> {
) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
let xid = decoded.xl_xid;
@@ -33,43 +35,57 @@ impl InterpretedWalRecord {
FlushUncommittedRecords::No
};
let metadata_record =
MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
let batch = SerializedValueBatch::from_decoded_filtered(
let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
HashMap::with_capacity(shards.len());
for shard in shards {
shard_records.insert(
*shard,
InterpretedWalRecord {
metadata_record: None,
batch: SerializedValueBatch::default(),
next_record_lsn,
flush_uncommitted,
xid,
},
);
}
MetadataRecord::from_decoded_filtered(
&decoded,
&mut shard_records,
next_record_lsn,
pg_version,
)?;
SerializedValueBatch::from_decoded_filtered(
decoded,
shard,
&mut shard_records,
next_record_lsn,
pg_version,
)?;
Ok(InterpretedWalRecord {
metadata_record,
batch,
next_record_lsn,
flush_uncommitted,
xid,
})
Ok(shard_records)
}
}
impl MetadataRecord {
/// Builds a metadata record for this WAL record, if any.
/// Populates the given `shard_records` with metadata records from this WAL record, if any,
/// discarding those belonging to other shards.
///
/// Only metadata records relevant for the given shard are emitted. Currently, most metadata
/// Only metadata records relevant for the given shards is emitted. Currently, most metadata
/// records are broadcast to all shards for simplicity, but this should be improved.
fn from_decoded_filtered(
decoded: &DecodedWALRecord,
shard: &ShardIdentity,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<Option<MetadataRecord>> {
) -> anyhow::Result<()> {
// Note: this doesn't actually copy the bytes since
// the [`Bytes`] type implements it via a level of indirection.
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
// First, generate metadata records from the decoded WAL record.
let mut metadata_record = match decoded.xl_rmid {
let metadata_record = match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
Self::decode_heapam_record(&mut buf, decoded, pg_version)?
}
@@ -112,41 +128,65 @@ impl MetadataRecord {
};
// Next, filter the metadata record by shard.
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
for (shard, record) in shard_records.iter_mut() {
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
let updated_old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
let updated_new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if updated_old_heap_blkno.is_some() || updated_new_heap_blkno.is_some() {
// Clone the record and update it for the current shard.
let mut for_shard = metadata_record.clone();
match for_shard {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
ref mut clear_vm_bits,
))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
ref mut clear_vm_bits,
)),
) => {
clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
record.metadata_record = for_shard;
}
_ => {
unreachable!("for_shard is a clone of what we checked above")
}
}
}
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if shard.is_shard_zero() {
record.metadata_record = metadata_record;
// No other shards should receive this record, so we stop traversing shards early.
break;
}
}
_ => {
// All other metadata records are sent to all shards.
record.metadata_record = metadata_record.clone();
}
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
}
}
_ => {}
}
Ok(metadata_record)
Ok(())
}
fn decode_heapam_record(

View File

@@ -48,7 +48,7 @@ pub mod proto {
tonic::include_proto!("interpreted_wal");
}
#[derive(Serialize, Deserialize)]
#[derive(Copy, Clone, Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
@@ -107,7 +107,7 @@ impl InterpretedWalRecord {
/// The interpreted part of the Postgres WAL record which requires metadata
/// writes to the underlying storage engine.
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum MetadataRecord {
Heapam(HeapamRecord),
Neonrmgr(NeonrmgrRecord),
@@ -123,12 +123,12 @@ pub enum MetadataRecord {
Replorigin(ReploriginRecord),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum HeapamRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct ClearVmBits {
pub new_heap_blkno: Option<u32>,
pub old_heap_blkno: Option<u32>,
@@ -136,29 +136,29 @@ pub struct ClearVmBits {
pub flags: u8,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum NeonrmgrRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum SmgrRecord {
Create(SmgrCreate),
Truncate(XlSmgrTruncate),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct SmgrCreate {
pub rel: RelTag,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum DbaseRecord {
Create(DbaseCreate),
Drop(DbaseDrop),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct DbaseCreate {
pub db_id: Oid,
pub tablespace_id: Oid,
@@ -166,32 +166,32 @@ pub struct DbaseCreate {
pub src_tablespace_id: Oid,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct DbaseDrop {
pub db_id: Oid,
pub tablespace_ids: Vec<Oid>,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum ClogRecord {
ZeroPage(ClogZeroPage),
Truncate(ClogTruncate),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct ClogZeroPage {
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct ClogTruncate {
pub pageno: u32,
pub oldest_xid: TransactionId,
pub oldest_xid_db: Oid,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum XactRecord {
Commit(XactCommon),
Abort(XactCommon),
@@ -200,7 +200,7 @@ pub enum XactRecord {
Prepare(XactPrepare),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct XactCommon {
pub parsed: XlXactParsedRecord,
pub origin_id: u16,
@@ -209,73 +209,73 @@ pub struct XactCommon {
pub lsn: Lsn,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct XactPrepare {
pub xl_xid: TransactionId,
pub data: Bytes,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum MultiXactRecord {
ZeroPage(MultiXactZeroPage),
Create(XlMultiXactCreate),
Truncate(XlMultiXactTruncate),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct MultiXactZeroPage {
pub slru_kind: SlruKind,
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum RelmapRecord {
Update(RelmapUpdate),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct RelmapUpdate {
pub update: XlRelmapUpdate,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum XlogRecord {
Raw(RawXlogRecord),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct RawXlogRecord {
pub info: u8,
pub lsn: Lsn,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum LogicalMessageRecord {
Put(PutLogicalMessage),
#[cfg(feature = "testing")]
Failpoint,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct PutLogicalMessage {
pub path: String,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum StandbyRecord {
RunningXacts(StandbyRunningXacts),
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct StandbyRunningXacts {
pub oldest_running_xid: TransactionId,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum ReploriginRecord {
Set(XlReploriginSet),
Drop(XlReploriginDrop),

View File

@@ -5,7 +5,7 @@
//! Such batches are created from decoded PG wal records and ingested
//! by the pageserver by writing directly to the ephemeral file.
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use bytes::{Bytes, BytesMut};
use pageserver_api::key::rel_block_to_key;
@@ -22,6 +22,8 @@ use utils::lsn::Lsn;
use pageserver_api::key::Key;
use crate::models::InterpretedWalRecord;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
/// Accompanying metadata for the batch
@@ -128,7 +130,8 @@ impl Default for SerializedValueBatch {
}
impl SerializedValueBatch {
/// Build a batch of serialized values from a decoded PG WAL record
/// Populates the given `shard_records` with value batches from this WAL record, if any,
/// discarding those belonging to other shards.
///
/// The batch will only contain values for keys targeting the specifiec
/// shard. Shard 0 is a special case, where any keys that don't belong to
@@ -136,21 +139,20 @@ impl SerializedValueBatch {
/// but absent from the raw buffer [`SerializedValueBatch::raw`]).
pub(crate) fn from_decoded_filtered(
decoded: DecodedWALRecord,
shard: &ShardIdentity,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<SerializedValueBatch> {
// First determine how big the buffer needs to be and allocate it up-front.
) -> anyhow::Result<()> {
// First determine how big the buffers need to be and allocate it up-front.
// This duplicates some of the work below, but it's empirically much faster.
let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
for (shard, record) in shard_records.iter_mut() {
assert!(record.batch.is_empty());
let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
record.batch.raw = Vec::with_capacity(estimate);
}
let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
let mut max_lsn: Lsn = Lsn(0);
let mut len: usize = 0;
for blk in decoded.blocks.iter() {
let relative_off = buf.len() as u64;
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
@@ -168,99 +170,98 @@ impl SerializedValueBatch {
);
}
let key_is_local = shard.is_key_local(&key);
for (shard, record) in shard_records.iter_mut() {
let key_is_local = shard.is_key_local(&key);
tracing::debug!(
lsn=%next_record_lsn,
key=%key,
"ingest: shard decision {}",
if !key_is_local { "drop" } else { "keep" },
);
tracing::debug!(
lsn=%next_record_lsn,
key=%key,
"ingest: shard decision {}",
if !key_is_local { "drop" } else { "keep" },
);
if !key_is_local {
if shard.is_shard_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
metadata.push(ValueMeta::Observed(ObservedValueMeta {
if !key_is_local {
if shard.is_shard_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
record
.batch
.metadata
.push(ValueMeta::Observed(ObservedValueMeta {
key: key.to_compact(),
lsn: next_record_lsn,
}))
}
continue;
}
// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
let val = if Self::block_is_image(&decoded, blk, pg_version) {
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
// TODO(vlad): skip the copy
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
//
// Match the logic of XLogReadBufferForRedoExtended:
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, next_record_lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);
Value::Image(image.freeze())
} else {
Value::WalRecord(NeonWalRecord::Postgres {
will_init: blk.will_init || blk.apply_image,
rec: decoded.record.clone(),
})
};
let relative_off = record.batch.raw.len() as u64;
val.ser_into(&mut record.batch.raw)
.expect("Writing into in-memory buffer is infallible");
let val_ser_size = record.batch.raw.len() - relative_off as usize;
record
.batch
.metadata
.push(ValueMeta::Serialized(SerializedValueMeta {
key: key.to_compact(),
lsn: next_record_lsn,
}))
}
continue;
batch_offset: relative_off,
len: val_ser_size,
will_init: val.will_init(),
}));
record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
record.batch.len += 1;
}
// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
let val = if Self::block_is_image(&decoded, blk, pg_version) {
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
// TODO(vlad): skip the copy
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
//
// Match the logic of XLogReadBufferForRedoExtended:
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, next_record_lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);
Value::Image(image.freeze())
} else {
Value::WalRecord(NeonWalRecord::Postgres {
will_init: blk.will_init || blk.apply_image,
rec: decoded.record.clone(),
})
};
val.ser_into(&mut buf)
.expect("Writing into in-memory buffer is infallible");
let val_ser_size = buf.len() - relative_off as usize;
metadata.push(ValueMeta::Serialized(SerializedValueMeta {
key: key.to_compact(),
lsn: next_record_lsn,
batch_offset: relative_off,
len: val_ser_size,
will_init: val.will_init(),
}));
max_lsn = std::cmp::max(max_lsn, next_record_lsn);
len += 1;
}
if cfg!(any(debug_assertions, test)) {
let batch = Self {
raw: buf,
metadata,
max_lsn,
len,
};
batch.validate_lsn_order();
return Ok(batch);
// Validate that the batches are correct
for record in shard_records.values() {
record.batch.validate_lsn_order();
}
}
Ok(Self {
raw: buf,
metadata,
max_lsn,
len,
})
Ok(())
}
/// Look into the decoded PG WAL record and determine

View File

@@ -278,6 +278,8 @@ async fn import_wal(
let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
let shard = vec![*tline.get_shard_identity()];
while last_lsn <= endpoint {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
@@ -314,10 +316,12 @@ async fn import_wal(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
tline.get_shard_identity(),
&shard,
lsn,
tline.pg_version,
)?;
)?
.remove(tline.get_shard_identity())
.unwrap();
walingest
.ingest_record(interpreted, &mut modification, ctx)
@@ -411,6 +415,7 @@ pub async fn import_wal_from_tar(
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
let shard = vec![*tline.get_shard_identity()];
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
@@ -459,10 +464,12 @@ pub async fn import_wal_from_tar(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
tline.get_shard_identity(),
&shard,
lsn,
tline.pg_version,
)?;
)?
.remove(tline.get_shard_identity())
.unwrap();
walingest
.ingest_record(interpreted, &mut modification, ctx)

View File

@@ -264,6 +264,8 @@ pub(super) async fn handle_walreceiver_connection(
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let shard = vec![*timeline.get_shard_identity()];
let interpreted_proto_config = match protocol {
PostgresClientProtocol::Vanilla => None,
PostgresClientProtocol::Interpreted {
@@ -476,10 +478,12 @@ pub(super) async fn handle_walreceiver_connection(
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
modification.tline.get_shard_identity(),
&shard,
next_record_lsn,
modification.tline.pg_version,
)?;
)?
.remove(timeline.get_shard_identity())
.unwrap();
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0

View File

@@ -2163,10 +2163,12 @@ mod tests {
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
modification.tline.get_shard_identity(),
&[*modification.tline.get_shard_identity()],
lsn,
modification.tline.pg_version,
)
.unwrap()
.remove(modification.tline.get_shard_identity())
.unwrap();
walingest

View File

@@ -57,6 +57,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
keepalive_ticker.reset();
let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
let shard = vec![self.shard];
loop {
tokio::select! {
@@ -80,14 +81,17 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
assert!(next_record_lsn.is_aligned());
max_next_record_lsn = Some(next_record_lsn);
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
&self.shard,
&shard,
next_record_lsn,
self.pg_version,
)
.with_context(|| "Failed to interpret WAL")?;
.with_context(|| "Failed to interpret WAL")?
.remove(&self.shard)
.unwrap();
if !interpreted.is_empty() {
records.push(interpreted);