From 9e0148de11feefae7402bdc655ff6bf4ace8bc1f Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 27 Nov 2024 12:12:21 +0000 Subject: [PATCH] safekeeper: use protobuf for sending compressed records to pageserver (#9821) ## Problem https://github.com/neondatabase/neon/pull/9746 lifted decoding and interpretation of WAL to the safekeeper. This reduced the ingested amount on the pageservers by around 10x for a tenant with 8 shards, but doubled the ingested amount for single sharded tenants. Also, https://github.com/neondatabase/neon/pull/9746 uses bincode which doesn't support schema evolution. Technically the schema can be evolved, but it's very cumbersome. ## Summary of changes This patch set addresses both problems by adding protobuf support for the interpreted wal records and adding compression support. Compressed protobuf reduced the ingested amount by 100x on the 32 shards `test_sharded_ingest` case (compared to non-interpreted proto). For the 1 shard case the reduction is 5x. Sister change to `rust-postgres` is [here](https://github.com/neondatabase/rust-postgres/pull/33). ## Links Related: https://github.com/neondatabase/neon/issues/9336 Epic: https://github.com/neondatabase/neon/issues/9329 --- Cargo.lock | 14 +- libs/pageserver_api/src/key.rs | 12 + libs/pq_proto/src/lib.rs | 4 - libs/utils/src/postgres_client.rs | 54 ++- libs/wal_decoder/Cargo.toml | 8 + libs/wal_decoder/build.rs | 11 + libs/wal_decoder/proto/interpreted_wal.proto | 43 +++ libs/wal_decoder/src/lib.rs | 1 + libs/wal_decoder/src/models.rs | 20 + libs/wal_decoder/src/wire_format.rs | 356 ++++++++++++++++++ .../walreceiver/connection_manager.rs | 4 +- .../walreceiver/walreceiver_connection.rs | 52 ++- safekeeper/src/handler.rs | 17 +- safekeeper/src/send_interpreted_wal.rs | 53 ++- safekeeper/src/send_wal.rs | 9 +- test_runner/fixtures/neon_fixtures.py | 36 ++ .../performance/test_sharded_ingest.py | 47 ++- test_runner/regress/test_compaction.py | 16 +- test_runner/regress/test_crafted_wal_end.py | 15 +- test_runner/regress/test_subxacts.py | 15 +- .../regress/test_wal_acceptor_async.py | 21 +- 21 files changed, 702 insertions(+), 106 deletions(-) create mode 100644 libs/wal_decoder/build.rs create mode 100644 libs/wal_decoder/proto/interpreted_wal.proto create mode 100644 libs/wal_decoder/src/wire_format.rs diff --git a/Cargo.lock b/Cargo.lock index c1a14210de..43a46fb1eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4133,7 +4133,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "bytes", "fallible-iterator", @@ -4146,7 +4146,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "base64 0.20.0", "byteorder", @@ -4165,7 +4165,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "bytes", "fallible-iterator", @@ -6468,7 +6468,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "async-trait", "byteorder", @@ -7120,10 +7120,16 @@ name = "wal_decoder" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "bytes", "pageserver_api", "postgres_ffi", + "prost", "serde", + "thiserror", + "tokio", + "tonic", + "tonic-build", "tracing", "utils", "workspace_hack", diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 4505101ea6..523d143381 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -229,6 +229,18 @@ impl Key { } } +impl CompactKey { + pub fn raw(&self) -> i128 { + self.0 + } +} + +impl From for CompactKey { + fn from(value: i128) -> Self { + Self(value) + } +} + impl fmt::Display for Key { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index b7871ab01f..4b0331999d 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -688,9 +688,6 @@ pub struct InterpretedWalRecordsBody<'a> { pub streaming_lsn: u64, /// Current end of WAL on the server pub commit_lsn: u64, - /// Start LSN of the next record in PG WAL. - /// Is 0 if the portion of PG WAL did not contain any records. - pub next_record_lsn: u64, pub data: &'a [u8], } @@ -1028,7 +1025,6 @@ impl BeMessage<'_> { // dependency buf.put_u64(rec.streaming_lsn); buf.put_u64(rec.commit_lsn); - buf.put_u64(rec.next_record_lsn); buf.put_slice(rec.data); }); } diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index 3073bbde4c..a62568202b 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -7,40 +7,31 @@ use postgres_connection::{parse_host_port, PgConnectionConfig}; use crate::id::TenantTimelineId; -/// Postgres client protocol types -#[derive( - Copy, - Clone, - PartialEq, - Eq, - strum_macros::EnumString, - strum_macros::Display, - serde_with::DeserializeFromStr, - serde_with::SerializeDisplay, - Debug, -)] -#[strum(serialize_all = "kebab-case")] -#[repr(u8)] +#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum InterpretedFormat { + Bincode, + Protobuf, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum Compression { + Zstd { level: i8 }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type", content = "args")] +#[serde(rename_all = "kebab-case")] pub enum PostgresClientProtocol { /// Usual Postgres replication protocol Vanilla, /// Custom shard-aware protocol that replicates interpreted records. /// Used to send wal from safekeeper to pageserver. - Interpreted, -} - -impl TryFrom for PostgresClientProtocol { - type Error = u8; - - fn try_from(value: u8) -> Result { - Ok(match value { - v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla, - v if v == (PostgresClientProtocol::Interpreted as u8) => { - PostgresClientProtocol::Interpreted - } - x => return Err(x), - }) - } + Interpreted { + format: InterpretedFormat, + compression: Option, + }, } pub struct ConnectionConfigArgs<'a> { @@ -63,7 +54,10 @@ impl<'a> ConnectionConfigArgs<'a> { "-c".to_owned(), format!("timeline_id={}", self.ttid.timeline_id), format!("tenant_id={}", self.ttid.tenant_id), - format!("protocol={}", self.protocol as u8), + format!( + "protocol={}", + serde_json::to_string(&self.protocol).unwrap() + ), ]; if self.shard_number.is_some() { diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml index c8c0f4c990..8fac4e38ca 100644 --- a/libs/wal_decoder/Cargo.toml +++ b/libs/wal_decoder/Cargo.toml @@ -8,11 +8,19 @@ license.workspace = true testing = ["pageserver_api/testing"] [dependencies] +async-compression.workspace = true anyhow.workspace = true bytes.workspace = true pageserver_api.workspace = true +prost.workspace = true postgres_ffi.workspace = true serde.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["io-util"] } +tonic.workspace = true tracing.workspace = true utils.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[build-dependencies] +tonic-build.workspace = true diff --git a/libs/wal_decoder/build.rs b/libs/wal_decoder/build.rs new file mode 100644 index 0000000000..d5b7ad02ad --- /dev/null +++ b/libs/wal_decoder/build.rs @@ -0,0 +1,11 @@ +fn main() -> Result<(), Box> { + // Generate rust code from .proto protobuf. + // + // Note: we previously tried to use deterministic location at proto/ for + // easy location, but apparently interference with cachepot sometimes fails + // the build then. Anyway, per cargo docs build script shouldn't output to + // anywhere but $OUT_DIR. + tonic_build::compile_protos("proto/interpreted_wal.proto") + .unwrap_or_else(|e| panic!("failed to compile protos {:?}", e)); + Ok(()) +} diff --git a/libs/wal_decoder/proto/interpreted_wal.proto b/libs/wal_decoder/proto/interpreted_wal.proto new file mode 100644 index 0000000000..0393392c1a --- /dev/null +++ b/libs/wal_decoder/proto/interpreted_wal.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package interpreted_wal; + +message InterpretedWalRecords { + repeated InterpretedWalRecord records = 1; + optional uint64 next_record_lsn = 2; +} + +message InterpretedWalRecord { + optional bytes metadata_record = 1; + SerializedValueBatch batch = 2; + uint64 next_record_lsn = 3; + bool flush_uncommitted = 4; + uint32 xid = 5; +} + +message SerializedValueBatch { + bytes raw = 1; + repeated ValueMeta metadata = 2; + uint64 max_lsn = 3; + uint64 len = 4; +} + +enum ValueMetaType { + Serialized = 0; + Observed = 1; +} + +message ValueMeta { + ValueMetaType type = 1; + CompactKey key = 2; + uint64 lsn = 3; + optional uint64 batch_offset = 4; + optional uint64 len = 5; + optional bool will_init = 6; +} + +message CompactKey { + int64 high = 1; + int64 low = 2; +} + diff --git a/libs/wal_decoder/src/lib.rs b/libs/wal_decoder/src/lib.rs index a8a26956e6..96b717021f 100644 --- a/libs/wal_decoder/src/lib.rs +++ b/libs/wal_decoder/src/lib.rs @@ -1,3 +1,4 @@ pub mod decoder; pub mod models; pub mod serialized_batch; +pub mod wire_format; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 7ac425cb5f..af22de5d95 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -37,12 +37,32 @@ use utils::lsn::Lsn; use crate::serialized_batch::SerializedValueBatch; +// Code generated by protobuf. +pub mod proto { + // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]` + // we don't use these types for anything but broker data transmission, + // so it's ok to ignore this one. + #![allow(clippy::derive_partial_eq_without_eq)] + // The generated ValueMeta has a `len` method generate for its `len` field. + #![allow(clippy::len_without_is_empty)] + tonic::include_proto!("interpreted_wal"); +} + #[derive(Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, } +/// A batch of interpreted WAL records +#[derive(Serialize, Deserialize)] +pub struct InterpretedWalRecords { + pub records: Vec, + // Start LSN of the next record after the batch. + // Note that said record may not belong to the current shard. + pub next_record_lsn: Option, +} + /// An interpreted Postgres WAL record, ready to be handled by the pageserver #[derive(Serialize, Deserialize)] pub struct InterpretedWalRecord { diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs new file mode 100644 index 0000000000..5a343054c3 --- /dev/null +++ b/libs/wal_decoder/src/wire_format.rs @@ -0,0 +1,356 @@ +use bytes::{BufMut, Bytes, BytesMut}; +use pageserver_api::key::CompactKey; +use prost::{DecodeError, EncodeError, Message}; +use tokio::io::AsyncWriteExt; +use utils::bin_ser::{BeSer, DeserializeError, SerializeError}; +use utils::lsn::Lsn; +use utils::postgres_client::{Compression, InterpretedFormat}; + +use crate::models::{ + FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, +}; + +use crate::serialized_batch::{ + ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta, +}; + +use crate::models::proto; + +#[derive(Debug, thiserror::Error)] +pub enum ToWireFormatError { + #[error("{0}")] + Bincode(#[from] SerializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufSerializeError), + #[error("{0}")] + Compression(#[from] std::io::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufSerializeError { + #[error("{0}")] + MetadataRecord(#[from] SerializeError), + #[error("{0}")] + Encode(#[from] EncodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum FromWireFormatError { + #[error("{0}")] + Bincode(#[from] DeserializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufDeserializeError), + #[error("{0}")] + Decompress(#[from] std::io::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufDeserializeError { + #[error("{0}")] + Transcode(#[from] TranscodeError), + #[error("{0}")] + Decode(#[from] DecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum TranscodeError { + #[error("{0}")] + BadInput(String), + #[error("{0}")] + MetadataRecord(#[from] DeserializeError), +} + +pub trait ToWireFormat { + fn to_wire( + self, + format: InterpretedFormat, + compression: Option, + ) -> impl std::future::Future> + Send; +} + +pub trait FromWireFormat { + type T; + fn from_wire( + buf: &Bytes, + format: InterpretedFormat, + compression: Option, + ) -> impl std::future::Future> + Send; +} + +impl ToWireFormat for InterpretedWalRecords { + async fn to_wire( + self, + format: InterpretedFormat, + compression: Option, + ) -> Result { + use async_compression::tokio::write::ZstdEncoder; + use async_compression::Level; + + let encode_res: Result = match format { + InterpretedFormat::Bincode => { + let buf = BytesMut::new(); + let mut buf = buf.writer(); + self.ser_into(&mut buf)?; + Ok(buf.into_inner().freeze()) + } + InterpretedFormat::Protobuf => { + let proto: proto::InterpretedWalRecords = self.try_into()?; + let mut buf = BytesMut::new(); + proto + .encode(&mut buf) + .map_err(|e| ToWireFormatError::Protobuf(e.into()))?; + + Ok(buf.freeze()) + } + }; + + let buf = encode_res?; + let compressed_buf = match compression { + Some(Compression::Zstd { level }) => { + let mut encoder = ZstdEncoder::with_quality( + Vec::with_capacity(buf.len() / 4), + Level::Precise(level as i32), + ); + encoder.write_all(&buf).await?; + encoder.shutdown().await?; + Bytes::from(encoder.into_inner()) + } + None => buf, + }; + + Ok(compressed_buf) + } +} + +impl FromWireFormat for InterpretedWalRecords { + type T = Self; + + async fn from_wire( + buf: &Bytes, + format: InterpretedFormat, + compression: Option, + ) -> Result { + let decompressed_buf = match compression { + Some(Compression::Zstd { .. }) => { + use async_compression::tokio::write::ZstdDecoder; + let mut decoded_buf = Vec::with_capacity(buf.len()); + let mut decoder = ZstdDecoder::new(&mut decoded_buf); + decoder.write_all(buf).await?; + decoder.flush().await?; + Bytes::from(decoded_buf) + } + None => buf.clone(), + }; + + match format { + InterpretedFormat::Bincode => { + InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode) + } + InterpretedFormat::Protobuf => { + let proto = proto::InterpretedWalRecords::decode(decompressed_buf) + .map_err(|e| FromWireFormatError::Protobuf(e.into()))?; + InterpretedWalRecords::try_from(proto) + .map_err(|e| FromWireFormatError::Protobuf(e.into())) + } + } + } +} + +impl TryFrom for proto::InterpretedWalRecords { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(proto::InterpretedWalRecord::try_from) + .collect::, _>>()?; + Ok(proto::InterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(|l| l.0), + }) + } +} + +impl TryFrom for proto::InterpretedWalRecord { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|meta_rec| -> Result, Self::Error> { + let mut buf = Vec::new(); + meta_rec.ser_into(&mut buf)?; + Ok(buf) + }) + .transpose()?; + + Ok(proto::InterpretedWalRecord { + metadata_record, + batch: Some(proto::SerializedValueBatch::from(value.batch)), + next_record_lsn: value.next_record_lsn.0, + flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes), + xid: value.xid, + }) + } +} + +impl From for proto::SerializedValueBatch { + fn from(value: SerializedValueBatch) -> Self { + proto::SerializedValueBatch { + raw: value.raw, + metadata: value + .metadata + .into_iter() + .map(proto::ValueMeta::from) + .collect(), + max_lsn: value.max_lsn.0, + len: value.len as u64, + } + } +} + +impl From for proto::ValueMeta { + fn from(value: ValueMeta) -> Self { + match value { + ValueMeta::Observed(obs) => proto::ValueMeta { + r#type: proto::ValueMetaType::Observed.into(), + key: Some(proto::CompactKey::from(obs.key)), + lsn: obs.lsn.0, + batch_offset: None, + len: None, + will_init: None, + }, + ValueMeta::Serialized(ser) => proto::ValueMeta { + r#type: proto::ValueMetaType::Serialized.into(), + key: Some(proto::CompactKey::from(ser.key)), + lsn: ser.lsn.0, + batch_offset: Some(ser.batch_offset), + len: Some(ser.len as u64), + will_init: Some(ser.will_init), + }, + } + } +} + +impl From for proto::CompactKey { + fn from(value: CompactKey) -> Self { + proto::CompactKey { + high: (value.raw() >> 64) as i64, + low: value.raw() as i64, + } + } +} + +impl TryFrom for InterpretedWalRecords { + type Error = TranscodeError; + + fn try_from(value: proto::InterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(InterpretedWalRecord::try_from) + .collect::>()?; + + Ok(InterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(Lsn::from), + }) + } +} + +impl TryFrom for InterpretedWalRecord { + type Error = TranscodeError; + + fn try_from(value: proto::InterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) }) + .transpose()?; + + let batch = { + let batch = value.batch.ok_or_else(|| { + TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string()) + })?; + + SerializedValueBatch::try_from(batch)? + }; + + Ok(InterpretedWalRecord { + metadata_record, + batch, + next_record_lsn: Lsn(value.next_record_lsn), + flush_uncommitted: if value.flush_uncommitted { + FlushUncommittedRecords::Yes + } else { + FlushUncommittedRecords::No + }, + xid: value.xid, + }) + } +} + +impl TryFrom for SerializedValueBatch { + type Error = TranscodeError; + + fn try_from(value: proto::SerializedValueBatch) -> Result { + let metadata = value + .metadata + .into_iter() + .map(ValueMeta::try_from) + .collect::, _>>()?; + + Ok(SerializedValueBatch { + raw: value.raw, + metadata, + max_lsn: Lsn(value.max_lsn), + len: value.len as usize, + }) + } +} + +impl TryFrom for ValueMeta { + type Error = TranscodeError; + + fn try_from(value: proto::ValueMeta) -> Result { + match proto::ValueMetaType::try_from(value.r#type) { + Ok(proto::ValueMetaType::Serialized) => { + Ok(ValueMeta::Serialized(SerializedValueMeta { + key: value + .key + .ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::key missing".to_string()) + })? + .into(), + lsn: Lsn(value.lsn), + batch_offset: value.batch_offset.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string()) + })?, + len: value.len.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::len missing".to_string()) + })? as usize, + will_init: value.will_init.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::will_init missing".to_string()) + })?, + })) + } + Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta { + key: value + .key + .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))? + .into(), + lsn: Lsn(value.lsn), + })), + Err(_) => Err(TranscodeError::BadInput(format!( + "Unexpected ValueMeta::type {}", + value.r#type + ))), + } + } +} + +impl From for CompactKey { + fn from(value: proto::CompactKey) -> Self { + (((value.high as i128) << 64) | (value.low as i128)).into() + } +} diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7a64703a30..583d6309ab 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -535,6 +535,7 @@ impl ConnectionManagerState { let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; let ingest_batch_size = self.conf.ingest_batch_size; + let protocol = self.conf.protocol; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -548,6 +549,7 @@ impl ConnectionManagerState { let res = super::walreceiver_connection::handle_walreceiver_connection( timeline, + protocol, new_sk.wal_source_connconf, events_sender, cancellation.clone(), @@ -991,7 +993,7 @@ impl ConnectionManagerState { PostgresClientProtocol::Vanilla => { (None, None, None) }, - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { .. } => { let shard_identity = self.timeline.get_shard_identity(); ( Some(shard_identity.number.0), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1a0e66ceb3..31cf1b6307 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn, Instrument}; -use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord}; +use wal_decoder::{ + models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords}, + wire_format::FromWireFormat, +}; use super::TaskStateUpdate; use crate::{ @@ -36,7 +39,7 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn}; +use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; /// Status of the connection. @@ -109,6 +112,7 @@ impl From for WalReceiverError { #[allow(clippy::too_many_arguments)] pub(super) async fn handle_walreceiver_connection( timeline: Arc, + protocol: PostgresClientProtocol, wal_source_connconf: PgConnectionConfig, events_sender: watch::Sender>, cancellation: CancellationToken, @@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection( let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?; + let interpreted_proto_config = match protocol { + PostgresClientProtocol::Vanilla => None, + PostgresClientProtocol::Interpreted { + format, + compression, + } => Some((format, compression)), + }; + while let Some(replication_message) = { select! { _ = cancellation.cancelled() => { @@ -332,16 +344,26 @@ pub(super) async fn handle_walreceiver_connection( // This is the end LSN of the raw WAL from which the records // were interpreted. let streaming_lsn = Lsn::from(raw.streaming_lsn()); - tracing::debug!( - "Received WAL up to {streaming_lsn} with next_record_lsn={}", - Lsn(raw.next_record_lsn().unwrap_or(0)) - ); - let records = Vec::::des(raw.data()).with_context(|| { - anyhow::anyhow!( + let (format, compression) = interpreted_proto_config.unwrap(); + let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression) + .await + .with_context(|| { + anyhow::anyhow!( "Failed to deserialize interpreted records ending at LSN {streaming_lsn}" ) - })?; + })?; + + let InterpretedWalRecords { + records, + next_record_lsn, + } = batch; + + tracing::debug!( + "Received WAL up to {} with next_record_lsn={:?}", + streaming_lsn, + next_record_lsn + ); // We start the modification at 0 because each interpreted record // advances it to its end LSN. 0 is just an initialization placeholder. @@ -360,14 +382,18 @@ pub(super) async fn handle_walreceiver_connection( .await?; } - let next_record_lsn = interpreted.next_record_lsn; + let local_next_record_lsn = interpreted.next_record_lsn; let ingested = walingest .ingest_record(interpreted, &mut modification, &ctx) .await - .with_context(|| format!("could not ingest record at {next_record_lsn}"))?; + .with_context(|| { + format!("could not ingest record at {local_next_record_lsn}") + })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); + tracing::debug!( + "ingest: filtered out record @ LSN {local_next_record_lsn}" + ); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -399,7 +425,7 @@ pub(super) async fn handle_walreceiver_connection( // need to advance last record LSN on all shards. If we've not ingested the latest // record, then set the LSN of the modification past it. This way all shards // advance their last record LSN at the same time. - let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) { + let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) { Some(lsn) if lsn > modification.get_lsn() => { modification.set_lsn(lsn).unwrap(); true diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index cec7c3c7ee..22f33b17e0 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -123,17 +123,10 @@ impl postgres_backend::Handler // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064 match opt.split_once('=') { Some(("protocol", value)) => { - let raw_value = value - .parse::() - .with_context(|| format!("Failed to parse {value} as protocol"))?; - - self.protocol = Some( - PostgresClientProtocol::try_from(raw_value).map_err(|_| { - QueryError::Other(anyhow::anyhow!( - "Unexpected client protocol type: {raw_value}" - )) - })?, - ); + self.protocol = + Some(serde_json::from_str(value).with_context(|| { + format!("Failed to parse {value} as protocol") + })?); } Some(("ztenantid", value)) | Some(("tenant_id", value)) => { self.tenant_id = Some(value.parse().with_context(|| { @@ -180,7 +173,7 @@ impl postgres_backend::Handler ))); } } - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { .. } => { match (shard_count, shard_number, shard_stripe_size) { (Some(count), Some(number), Some(stripe_size)) => { let params = ShardParameters { diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index cf0ee276e9..2589030422 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -9,9 +9,11 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder}; use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::MissedTickBehavior; -use utils::bin_ser::BeSer; use utils::lsn::Lsn; -use wal_decoder::models::InterpretedWalRecord; +use utils::postgres_client::Compression; +use utils::postgres_client::InterpretedFormat; +use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords}; +use wal_decoder::wire_format::ToWireFormat; use crate::send_wal::EndWatchView; use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder}; @@ -20,6 +22,8 @@ use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder}; /// This is used for sending WAL to the pageserver. Said WAL /// is pre-interpreted and filtered for the shard. pub(crate) struct InterpretedWalSender<'a, IO> { + pub(crate) format: InterpretedFormat, + pub(crate) compression: Option, pub(crate) pgb: &'a mut PostgresBackend, pub(crate) wal_stream_builder: WalReaderStreamBuilder, pub(crate) end_watch_view: EndWatchView, @@ -28,6 +32,12 @@ pub(crate) struct InterpretedWalSender<'a, IO> { pub(crate) appname: Option, } +struct Batch { + wal_end_lsn: Lsn, + available_wal_end_lsn: Lsn, + records: InterpretedWalRecords, +} + impl InterpretedWalSender<'_, IO> { /// Send interpreted WAL to a receiver. /// Stops when an error occurs or the receiver is caught up and there's no active compute. @@ -46,10 +56,13 @@ impl InterpretedWalSender<'_, IO> { keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); keepalive_ticker.reset(); + let (tx, mut rx) = tokio::sync::mpsc::channel::(2); + loop { tokio::select! { - // Get some WAL from the stream and then: decode, interpret and send it - wal = stream.next() => { + // Get some WAL from the stream and then: decode, interpret and push it down the + // pipeline. + wal = stream.next(), if tx.capacity() > 0 => { let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal { Some(some) => some?, None => { break; } @@ -81,10 +94,26 @@ impl InterpretedWalSender<'_, IO> { } } - let mut buf = Vec::new(); - records - .ser_into(&mut buf) - .with_context(|| "Failed to serialize interpreted WAL")?; + let batch = InterpretedWalRecords { + records, + next_record_lsn: max_next_record_lsn + }; + + tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap(); + }, + // For a previously interpreted batch, serialize it and push it down the wire. + batch = rx.recv() => { + let batch = match batch { + Some(b) => b, + None => { break; } + }; + + let buf = batch + .records + .to_wire(self.format, self.compression) + .await + .with_context(|| "Failed to serialize interpreted WAL") + .map_err(CopyStreamHandlerEnd::from)?; // Reset the keep alive ticker since we are sending something // over the wire now. @@ -92,13 +121,11 @@ impl InterpretedWalSender<'_, IO> { self.pgb .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody { - streaming_lsn: wal_end_lsn.0, - commit_lsn: available_wal_end_lsn.0, - next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0, - data: buf.as_slice(), + streaming_lsn: batch.wal_end_lsn.0, + commit_lsn: batch.available_wal_end_lsn.0, + data: &buf, })).await?; } - // Send a periodic keep alive when the connection has been idle for a while. _ = keepalive_ticker.tick() => { self.pgb diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 1acfcad418..225b7f4c05 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -454,7 +454,7 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}", + "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}", start_pos, end_pos, matches!(end_watch, EndWatch::Flush(_)), @@ -489,7 +489,10 @@ impl SafekeeperPostgresHandler { Either::Left(sender.run()) } - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { + format, + compression, + } => { let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000; let end_watch_view = end_watch.view(); let wal_stream_builder = WalReaderStreamBuilder { @@ -502,6 +505,8 @@ impl SafekeeperPostgresHandler { }; let sender = InterpretedWalSender { + format, + compression, pgb, wal_stream_builder, end_watch_view, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 07d442b4a6..a45a311dc2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -310,6 +310,31 @@ class PgProtocol: return self.safe_psql(query, log_query=log_query)[0][0] +class PageserverWalReceiverProtocol(StrEnum): + VANILLA = "vanilla" + INTERPRETED = "interpreted" + + @staticmethod + def to_config_key_value(proto) -> tuple[str, dict[str, Any]]: + if proto == PageserverWalReceiverProtocol.VANILLA: + return ( + "wal_receiver_protocol", + { + "type": "vanilla", + }, + ) + elif proto == PageserverWalReceiverProtocol.INTERPRETED: + return ( + "wal_receiver_protocol", + { + "type": "interpreted", + "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, + }, + ) + else: + raise ValueError(f"Unknown protocol type: {proto}") + + class NeonEnvBuilder: """ Builder object to create a Neon runtime environment @@ -356,6 +381,7 @@ class NeonEnvBuilder: safekeeper_extra_opts: list[str] | None = None, storage_controller_port_override: int | None = None, pageserver_virtual_file_io_mode: str | None = None, + pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -409,6 +435,8 @@ class NeonEnvBuilder: self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode + self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1023,6 +1051,7 @@ class NeonEnv: self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode + self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { @@ -1092,6 +1121,13 @@ class NeonEnv: if self.pageserver_virtual_file_io_mode is not None: ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode + if self.pageserver_wal_receiver_protocol is not None: + key, value = PageserverWalReceiverProtocol.to_config_key_value( + self.pageserver_wal_receiver_protocol + ) + if key not in ps_cfg: + ps_cfg[key] = value + # Create a corresponding NeonPageserver object self.pageservers.append( NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]) diff --git a/test_runner/performance/test_sharded_ingest.py b/test_runner/performance/test_sharded_ingest.py index e965aae5a0..4c21e799c8 100644 --- a/test_runner/performance/test_sharded_ingest.py +++ b/test_runner/performance/test_sharded_ingest.py @@ -15,7 +15,14 @@ from fixtures.neon_fixtures import ( @pytest.mark.timeout(600) @pytest.mark.parametrize("shard_count", [1, 8, 32]) -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [ + "vanilla", + "interpreted-bincode-compressed", + "interpreted-protobuf-compressed", + ], +) def test_sharded_ingest( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, @@ -27,14 +34,42 @@ def test_sharded_ingest( and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case (shard_count=1) to the sharded case indicates the overhead of sharding. """ - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) - ROW_COUNT = 100_000_000 # about 7 GB of WAL neon_env_builder.num_pageservers = shard_count - env = neon_env_builder.init_start() + env = neon_env_builder.init_configs() + + for ps in env.pageservers: + if wal_receiver_protocol == "vanilla": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "vanilla", + } + } + ) + elif wal_receiver_protocol == "interpreted-bincode-compressed": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "interpreted", + "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, + } + } + ) + elif wal_receiver_protocol == "interpreted-protobuf-compressed": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "interpreted", + "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, + } + } + ) + else: + raise AssertionError("Test must use explicit wal receiver protocol config") + + env.start() # Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure # the storage controller doesn't mess with shard placements. diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 79fd256304..302a8fd0d1 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -8,6 +8,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, + PageserverWalReceiverProtocol, generate_uploads_and_deletions, ) from fixtures.pageserver.http import PageserverApiException @@ -27,8 +28,13 @@ AGGRESIVE_COMPACTION_TENANT_CONF = { @skip_in_debug_build("only run with release build") -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) -def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) +def test_pageserver_compaction_smoke( + neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol +): """ This is a smoke test that compaction kicks in. The workload repeatedly churns a small number of rows and manually instructs the pageserver to run compaction @@ -37,10 +43,12 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei observed bounds. """ + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol + # Effectively disable the page cache to rely only on image layers # to shorten reads. - neon_env_builder.pageserver_config_override = f""" -page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}' + neon_env_builder.pageserver_config_override = """ +page_cache_size=10 """ env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) diff --git a/test_runner/regress/test_crafted_wal_end.py b/test_runner/regress/test_crafted_wal_end.py index 70e71d99cd..6b9dcbba07 100644 --- a/test_runner/regress/test_crafted_wal_end.py +++ b/test_runner/regress/test_crafted_wal_end.py @@ -3,7 +3,7 @@ from __future__ import annotations import pytest from fixtures.log_helper import log from fixtures.neon_cli import WalCraft -from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol # Restart nodes with WAL end having specially crafted shape, like last record # crossing segment boundary, to test decoding issues. @@ -19,13 +19,16 @@ from fixtures.neon_fixtures import NeonEnvBuilder "wal_record_crossing_segment_followed_by_small_one", ], ) -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) def test_crafted_wal_end( - neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str + neon_env_builder: NeonEnvBuilder, + wal_type: str, + wal_receiver_protocol: PageserverWalReceiverProtocol, ): - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() env.create_branch("test_crafted_wal_end") diff --git a/test_runner/regress/test_subxacts.py b/test_runner/regress/test_subxacts.py index 1d86c353be..b235da0bc7 100644 --- a/test_runner/regress/test_subxacts.py +++ b/test_runner/regress/test_subxacts.py @@ -1,7 +1,11 @@ from __future__ import annotations import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PageserverWalReceiverProtocol, + check_restored_datadir_content, +) # Test subtransactions @@ -10,11 +14,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_conten # maintained in the pageserver, so subtransactions are not very exciting for # Neon. They are included in the commit record though and updated in the # CLOG. -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol): - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 094b10b576..b32b028fa1 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -11,7 +11,13 @@ import pytest import toml from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import getLogger -from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + NeonEnvBuilder, + PageserverWalReceiverProtocol, + Safekeeper, +) from fixtures.remote_storage import RemoteStorageKind from fixtures.utils import skip_in_debug_build @@ -622,12 +628,15 @@ async def run_segment_init_failure(env: NeonEnv): # Test (injected) failure during WAL segment init. # https://github.com/neondatabase/neon/issues/6401 # https://github.com/neondatabase/neon/issues/6402 -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) -def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) +def test_segment_init_failure( + neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol +): neon_env_builder.num_safekeepers = 1 - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() asyncio.run(run_segment_init_failure(env))