Compare commits

...

10 Commits

Author SHA1 Message Date
Vlad Lazar
8379efda6a wip: set default proto to interpreted compressed protobuf 2024-11-25 18:52:02 +01:00
Vlad Lazar
884803a51f wip: use pack-interpreted branch of rust-postgres 2024-11-25 18:49:26 +01:00
Vlad Lazar
7ebbdad141 tests: fix currently protocol parametrised tests 2024-11-25 18:49:23 +01:00
Vlad Lazar
1db16e29dc tests: update sharded ingest test to use compression 2024-11-25 18:45:41 +01:00
Vlad Lazar
cbe00424a6 safekeeper: pipeline WAL sending
With this patch we can serialize one batch while waiting for the
read for the next one.
2024-11-25 18:45:38 +01:00
Vlad Lazar
0a13a06053 wal_decoder: add compression support 2024-11-25 18:35:48 +01:00
Vlad Lazar
6009d9fe4d wal_decoder: add protobuf support
The bulk of this commit is glue code to convert between protobuf
representation and in-memory representation.
2024-11-25 18:35:48 +01:00
Vlad Lazar
b32c6a8798 wal_decoder: generate protobuf types 2024-11-25 18:35:48 +01:00
Vlad Lazar
8ffb1c1e51 wal_decoder: define records batch type
Instead of serializing a vector of records over the wire,
define a type for it. Also include next_record_lsn into that type
to get varint encoding.
2024-11-25 18:35:47 +01:00
Vlad Lazar
7b0f1605b2 wal_decoder: abstract wire format
We want to add new wire formats, but the current code has bincode
hard-codeded.

To this end:
1. Rework the wal_receiver_protocol PS config to include a format
   modifier for interpreted protocol type.
2. Abstract wire format encoding and decoding to a separate module
   in wal_decoder
3. Glue things back together
2024-11-25 18:34:15 +01:00
24 changed files with 730 additions and 112 deletions

14
Cargo.lock generated
View File

@@ -4133,7 +4133,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/pack-interpreted#75e5883539b36aa0aec54c2542dc76c4ce213b29"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4146,7 +4146,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/pack-interpreted#75e5883539b36aa0aec54c2542dc76c4ce213b29"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4165,7 +4165,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/pack-interpreted#75e5883539b36aa0aec54c2542dc76c4ce213b29"
dependencies = [
"bytes",
"fallible-iterator",
@@ -6468,7 +6468,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/pack-interpreted#75e5883539b36aa0aec54c2542dc76c4ce213b29"
dependencies = [
"async-trait",
"byteorder",
@@ -7120,10 +7120,16 @@ name = "wal_decoder"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"bytes",
"pageserver_api",
"postgres_ffi",
"prost",
"serde",
"thiserror",
"tokio",
"tonic",
"tonic-build",
"tracing",
"utils",
"workspace_hack",

View File

@@ -206,10 +206,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -249,7 +249,7 @@ tonic-build = "0.12"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted" }
################# Binary contents sections

View File

@@ -284,6 +284,7 @@ pub mod defaults {
use crate::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
use utils::postgres_client;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -333,7 +334,10 @@ pub mod defaults {
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
utils::postgres_client::PostgresClientProtocol::Vanilla;
utils::postgres_client::PostgresClientProtocol::Interpreted {
format: utils::postgres_client::InterpretedFormat::Protobuf,
compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }),
};
}
impl Default for ConfigToml {

View File

@@ -229,6 +229,18 @@ impl Key {
}
}
impl CompactKey {
pub fn raw(&self) -> i128 {
self.0
}
}
impl From<i128> for CompactKey {
fn from(value: i128) -> Self {
Self(value)
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(

View File

@@ -688,9 +688,6 @@ pub struct InterpretedWalRecordsBody<'a> {
pub streaming_lsn: u64,
/// Current end of WAL on the server
pub commit_lsn: u64,
/// Start LSN of the next record in PG WAL.
/// Is 0 if the portion of PG WAL did not contain any records.
pub next_record_lsn: u64,
pub data: &'a [u8],
}
@@ -1028,7 +1025,6 @@ impl BeMessage<'_> {
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_u64(rec.next_record_lsn);
buf.put_slice(rec.data);
});
}

View File

@@ -7,40 +7,31 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Postgres client protocol types
#[derive(
Copy,
Clone,
PartialEq,
Eq,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum InterpretedFormat {
Bincode,
Protobuf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum Compression {
Zstd { level: i8 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "args")]
#[serde(rename_all = "kebab-case")]
pub enum PostgresClientProtocol {
/// Usual Postgres replication protocol
Vanilla,
/// Custom shard-aware protocol that replicates interpreted records.
/// Used to send wal from safekeeper to pageserver.
Interpreted,
}
impl TryFrom<u8> for PostgresClientProtocol {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
v if v == (PostgresClientProtocol::Interpreted as u8) => {
PostgresClientProtocol::Interpreted
}
x => return Err(x),
})
}
Interpreted {
format: InterpretedFormat,
compression: Option<Compression>,
},
}
pub struct ConnectionConfigArgs<'a> {
@@ -63,7 +54,10 @@ impl<'a> ConnectionConfigArgs<'a> {
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!("protocol={}", self.protocol as u8),
format!(
"protocol={}",
serde_json::to_string(&self.protocol).unwrap()
),
];
if self.shard_number.is_some() {

View File

@@ -8,11 +8,19 @@ license.workspace = true
testing = ["pageserver_api/testing"]
[dependencies]
async-compression.workspace = true
anyhow.workspace = true
bytes.workspace = true
pageserver_api.workspace = true
prost.workspace = true
postgres_ffi.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }
tonic.workspace = true
tracing.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[build-dependencies]
tonic-build.workspace = true

11
libs/wal_decoder/build.rs Normal file
View File

@@ -0,0 +1,11 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Generate rust code from .proto protobuf.
//
// Note: we previously tried to use deterministic location at proto/ for
// easy location, but apparently interference with cachepot sometimes fails
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/interpreted_wal.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
Ok(())
}

View File

@@ -0,0 +1,43 @@
syntax = "proto3";
package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
}
message InterpretedWalRecord {
optional bytes metadata_record = 1;
SerializedValueBatch batch = 2;
uint64 next_record_lsn = 3;
bool flush_uncommitted = 4;
uint32 xid = 5;
}
message SerializedValueBatch {
bytes raw = 1;
repeated ValueMeta metadata = 2;
uint64 max_lsn = 3;
uint64 len = 4;
}
enum ValueMetaType {
Serialized = 0;
Observed = 1;
}
message ValueMeta {
ValueMetaType type = 1;
CompactKey key = 2;
uint64 lsn = 3;
optional uint64 batch_offset = 4;
optional uint64 len = 5;
optional bool will_init = 6;
}
message CompactKey {
int64 high = 1;
int64 low = 2;
}

View File

@@ -1,3 +1,4 @@
pub mod decoder;
pub mod models;
pub mod serialized_batch;
pub mod wire_format;

View File

@@ -37,12 +37,32 @@ use utils::lsn::Lsn;
use crate::serialized_batch::SerializedValueBatch;
// Code generated by protobuf.
pub mod proto {
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
// we don't use these types for anything but broker data transmission,
// so it's ok to ignore this one.
#![allow(clippy::derive_partial_eq_without_eq)]
// The generated ValueMeta has a `len` method generate for its `len` field.
#![allow(clippy::len_without_is_empty)]
tonic::include_proto!("interpreted_wal");
}
#[derive(Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
}
/// A batch of interpreted WAL records
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may belong to the current shard.
pub next_record_lsn: Option<Lsn>,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecord {

View File

@@ -0,0 +1,358 @@
use bytes::{BufMut, Bytes, BytesMut};
use pageserver_api::key::CompactKey;
use prost::{DecodeError, EncodeError, Message};
use tokio::io::AsyncWriteExt;
use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::postgres_client::{Compression, InterpretedFormat};
use crate::models::{
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
};
use crate::serialized_batch::{
ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
};
use crate::models::proto::CompactKey as ProtoCompactKey;
use crate::models::proto::InterpretedWalRecord as ProtoInterpretedWalRecord;
use crate::models::proto::InterpretedWalRecords as ProtoInterpretedWalRecords;
use crate::models::proto::SerializedValueBatch as ProtoSerializedValueBatch;
use crate::models::proto::ValueMeta as ProtoValueMeta;
use crate::models::proto::ValueMetaType as ProtoValueMetaType;
#[derive(Debug, thiserror::Error)]
pub enum ToWireFormatError {
#[error("{0}")]
Bincode(#[from] SerializeError),
#[error("{0}")]
Protobuf(#[from] ProtobufSerializeError),
#[error("{0}")]
Compression(#[from] std::io::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum ProtobufSerializeError {
#[error("{0}")]
MetadataRecord(#[from] SerializeError),
#[error("{0}")]
Encode(#[from] EncodeError),
}
#[derive(Debug, thiserror::Error)]
pub enum FromWireFormatError {
#[error("{0}")]
Bincode(#[from] DeserializeError),
#[error("{0}")]
Protobuf(#[from] ProtobufDeserializeError),
#[error("{0}")]
Decompress(#[from] std::io::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum ProtobufDeserializeError {
#[error("{0}")]
Transcode(#[from] TranscodeError),
#[error("{0}")]
Decode(#[from] DecodeError),
}
#[derive(Debug, thiserror::Error)]
pub enum TranscodeError {
#[error("{0}")]
BadInput(String),
#[error("{0}")]
MetadataRecord(#[from] DeserializeError),
}
pub trait ToWireFormat {
fn to_wire(
self,
format: InterpretedFormat,
compression: Option<Compression>,
) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
}
pub trait FromWireFormat {
type T;
fn from_wire(
buf: &Bytes,
format: InterpretedFormat,
compression: Option<Compression>,
) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
}
impl ToWireFormat for InterpretedWalRecords {
async fn to_wire(
self,
format: InterpretedFormat,
compression: Option<Compression>,
) -> Result<Bytes, ToWireFormatError> {
use async_compression::tokio::write::ZstdEncoder;
use async_compression::Level;
let encode_res: Result<Bytes, ToWireFormatError> = match format {
InterpretedFormat::Bincode => {
let buf = BytesMut::new();
let mut buf = buf.writer();
self.ser_into(&mut buf)?;
Ok(buf.into_inner().freeze())
}
InterpretedFormat::Protobuf => {
let proto: ProtoInterpretedWalRecords = self.try_into()?;
let mut buf = BytesMut::new();
proto
.encode(&mut buf)
.map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
Ok(buf.freeze())
}
};
let buf = encode_res?;
let compressed_buf = match compression {
Some(Compression::Zstd { level }) => {
let mut encoder = ZstdEncoder::with_quality(
Vec::with_capacity(buf.len() / 4),
Level::Precise(level as i32),
);
encoder.write_all(&buf).await?;
encoder.shutdown().await?;
Bytes::from(encoder.into_inner())
}
None => buf,
};
Ok(compressed_buf)
}
}
impl FromWireFormat for InterpretedWalRecords {
type T = Self;
async fn from_wire(
buf: &Bytes,
format: InterpretedFormat,
compression: Option<Compression>,
) -> Result<Self, FromWireFormatError> {
let decompressed_buf = match compression {
Some(Compression::Zstd { .. }) => {
use async_compression::tokio::write::ZstdDecoder;
let mut decoded_buf = Vec::with_capacity(buf.len());
let mut decoder = ZstdDecoder::new(&mut decoded_buf);
decoder.write_all(buf).await?;
decoder.flush().await?;
Bytes::from(decoded_buf)
}
None => buf.clone(),
};
match format {
InterpretedFormat::Bincode => {
InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
}
InterpretedFormat::Protobuf => {
let proto = ProtoInterpretedWalRecords::decode(decompressed_buf)
.map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
InterpretedWalRecords::try_from(proto)
.map_err(|e| FromWireFormatError::Protobuf(e.into()))
}
}
}
}
impl TryFrom<InterpretedWalRecords> for ProtoInterpretedWalRecords {
type Error = SerializeError;
fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
let records = value
.records
.into_iter()
.map(ProtoInterpretedWalRecord::try_from)
.collect::<Result<Vec<_>, _>>()?;
Ok(ProtoInterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(|l| l.0),
})
}
}
impl TryFrom<InterpretedWalRecord> for ProtoInterpretedWalRecord {
type Error = SerializeError;
fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
let metadata_record = value
.metadata_record
.map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
let mut buf = Vec::new();
meta_rec.ser_into(&mut buf)?;
Ok(buf)
})
.transpose()?;
Ok(ProtoInterpretedWalRecord {
metadata_record,
batch: Some(ProtoSerializedValueBatch::from(value.batch)),
next_record_lsn: value.next_record_lsn.0,
flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
xid: value.xid,
})
}
}
impl From<SerializedValueBatch> for ProtoSerializedValueBatch {
fn from(value: SerializedValueBatch) -> Self {
ProtoSerializedValueBatch {
raw: value.raw,
metadata: value
.metadata
.into_iter()
.map(ProtoValueMeta::from)
.collect(),
max_lsn: value.max_lsn.0,
len: value.len as u64,
}
}
}
impl From<ValueMeta> for ProtoValueMeta {
fn from(value: ValueMeta) -> Self {
match value {
ValueMeta::Observed(obs) => ProtoValueMeta {
r#type: ProtoValueMetaType::Observed.into(),
key: Some(ProtoCompactKey::from(obs.key)),
lsn: obs.lsn.0,
batch_offset: None,
len: None,
will_init: None,
},
ValueMeta::Serialized(ser) => ProtoValueMeta {
r#type: ProtoValueMetaType::Serialized.into(),
key: Some(ProtoCompactKey::from(ser.key)),
lsn: ser.lsn.0,
batch_offset: Some(ser.batch_offset),
len: Some(ser.len as u64),
will_init: Some(ser.will_init),
},
}
}
}
impl From<CompactKey> for ProtoCompactKey {
fn from(value: CompactKey) -> Self {
ProtoCompactKey {
high: (value.raw() >> 64) as i64,
low: value.raw() as i64,
}
}
}
impl TryFrom<ProtoInterpretedWalRecords> for InterpretedWalRecords {
type Error = TranscodeError;
fn try_from(value: ProtoInterpretedWalRecords) -> Result<Self, Self::Error> {
let records = value
.records
.into_iter()
.map(InterpretedWalRecord::try_from)
.collect::<Result<_, _>>()?;
Ok(InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(Lsn::from),
})
}
}
impl TryFrom<ProtoInterpretedWalRecord> for InterpretedWalRecord {
type Error = TranscodeError;
fn try_from(value: ProtoInterpretedWalRecord) -> Result<Self, Self::Error> {
let metadata_record = value
.metadata_record
.map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
.transpose()?;
let batch = {
let batch = value.batch.ok_or_else(|| {
TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
})?;
SerializedValueBatch::try_from(batch)?
};
Ok(InterpretedWalRecord {
metadata_record,
batch,
next_record_lsn: Lsn(value.next_record_lsn),
flush_uncommitted: if value.flush_uncommitted {
FlushUncommittedRecords::Yes
} else {
FlushUncommittedRecords::No
},
xid: value.xid,
})
}
}
impl TryFrom<ProtoSerializedValueBatch> for SerializedValueBatch {
type Error = TranscodeError;
fn try_from(value: ProtoSerializedValueBatch) -> Result<Self, Self::Error> {
let metadata = value
.metadata
.into_iter()
.map(ValueMeta::try_from)
.collect::<Result<Vec<_>, _>>()?;
Ok(SerializedValueBatch {
raw: value.raw,
metadata,
max_lsn: Lsn(value.max_lsn),
len: value.len as usize,
})
}
}
impl TryFrom<ProtoValueMeta> for ValueMeta {
type Error = TranscodeError;
fn try_from(value: ProtoValueMeta) -> Result<Self, Self::Error> {
match ProtoValueMetaType::try_from(value.r#type) {
Ok(ProtoValueMetaType::Serialized) => Ok(ValueMeta::Serialized(SerializedValueMeta {
key: value
.key
.ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
.into(),
lsn: Lsn(value.lsn),
batch_offset: value.batch_offset.ok_or_else(|| {
TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
})?,
len: value
.len
.ok_or_else(|| TranscodeError::BadInput("ValueMeta::len missing".to_string()))?
as usize,
will_init: value.will_init.ok_or_else(|| {
TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
})?,
})),
Ok(ProtoValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
key: value
.key
.ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
.into(),
lsn: Lsn(value.lsn),
})),
Err(_) => Err(TranscodeError::BadInput(format!(
"Unexpected ValueMeta::type {}",
value.r#type
))),
}
}
}
impl From<ProtoCompactKey> for CompactKey {
fn from(value: ProtoCompactKey) -> Self {
(((value.high as i128) << 64) | (value.low as i128)).into()
}
}

View File

@@ -535,6 +535,7 @@ impl ConnectionManagerState {
let node_id = new_sk.safekeeper_id;
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let protocol = self.conf.protocol;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -548,6 +549,7 @@ impl ConnectionManagerState {
let res = super::walreceiver_connection::handle_walreceiver_connection(
timeline,
protocol,
new_sk.wal_source_connconf,
events_sender,
cancellation.clone(),
@@ -991,7 +993,7 @@ impl ConnectionManagerState {
PostgresClientProtocol::Vanilla => {
(None, None, None)
},
PostgresClientProtocol::Interpreted => {
PostgresClientProtocol::Interpreted { .. } => {
let shard_identity = self.timeline.get_shard_identity();
(
Some(shard_identity.number.0),

View File

@@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn, Instrument};
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
use wal_decoder::{
models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords},
wire_format::FromWireFormat,
};
use super::TaskStateUpdate;
use crate::{
@@ -36,7 +39,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
/// Status of the connection.
@@ -109,6 +112,7 @@ impl From<WalDecodeError> for WalReceiverError {
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
protocol: PostgresClientProtocol,
wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
cancellation: CancellationToken,
@@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection(
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let interpreted_proto_config = match protocol {
PostgresClientProtocol::Vanilla => None,
PostgresClientProtocol::Interpreted {
format,
compression,
} => Some((format, compression)),
};
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {
@@ -332,16 +344,26 @@ pub(super) async fn handle_walreceiver_connection(
// This is the end LSN of the raw WAL from which the records
// were interpreted.
let streaming_lsn = Lsn::from(raw.streaming_lsn());
tracing::debug!(
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
Lsn(raw.next_record_lsn().unwrap_or(0))
);
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
anyhow::anyhow!(
let (format, compression) = interpreted_proto_config.unwrap();
let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
.await
.with_context(|| {
anyhow::anyhow!(
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
)
})?;
})?;
let InterpretedWalRecords {
records,
next_record_lsn,
} = batch;
tracing::debug!(
"Received WAL up to {} with next_record_lsn={:?}",
streaming_lsn,
next_record_lsn
);
// We start the modification at 0 because each interpreted record
// advances it to its end LSN. 0 is just an initialization placeholder.
@@ -360,14 +382,18 @@ pub(super) async fn handle_walreceiver_connection(
.await?;
}
let next_record_lsn = interpreted.next_record_lsn;
let local_next_record_lsn = interpreted.next_record_lsn;
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
.with_context(|| {
format!("could not ingest record at {local_next_record_lsn}")
})?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
tracing::debug!(
"ingest: filtered out record @ LSN {local_next_record_lsn}"
);
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
@@ -399,7 +425,7 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true

View File

@@ -123,17 +123,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
match opt.split_once('=') {
Some(("protocol", value)) => {
let raw_value = value
.parse::<u8>()
.with_context(|| format!("Failed to parse {value} as protocol"))?;
self.protocol = Some(
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
QueryError::Other(anyhow::anyhow!(
"Unexpected client protocol type: {raw_value}"
))
})?,
);
self.protocol =
Some(serde_json::from_str(value).with_context(|| {
format!("Failed to parse {value} as protocol")
})?);
}
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
self.tenant_id = Some(value.parse().with_context(|| {
@@ -180,7 +173,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
)));
}
}
PostgresClientProtocol::Interpreted => {
PostgresClientProtocol::Interpreted { .. } => {
match (shard_count, shard_number, shard_stripe_size) {
(Some(count), Some(number), Some(stripe_size)) => {
let params = ShardParameters {

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
@@ -9,9 +10,11 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::MissedTickBehavior;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use wal_decoder::models::InterpretedWalRecord;
use utils::postgres_client::Compression;
use utils::postgres_client::InterpretedFormat;
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
use wal_decoder::wire_format::ToWireFormat;
use crate::send_wal::EndWatchView;
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
@@ -20,6 +23,8 @@ use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
/// This is used for sending WAL to the pageserver. Said WAL
/// is pre-interpreted and filtered for the shard.
pub(crate) struct InterpretedWalSender<'a, IO> {
pub(crate) format: InterpretedFormat,
pub(crate) compression: Option<Compression>,
pub(crate) pgb: &'a mut PostgresBackend<IO>,
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
pub(crate) end_watch_view: EndWatchView,
@@ -28,6 +33,18 @@ pub(crate) struct InterpretedWalSender<'a, IO> {
pub(crate) appname: Option<String>,
}
struct Batch {
wal_end_lsn: Lsn,
available_wal_end_lsn: Lsn,
records: InterpretedWalRecords,
}
struct SerializedBatch {
wal_end_lsn: Lsn,
available_wal_end_lsn: Lsn,
buf: Bytes,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
/// Send interpreted WAL to a receiver.
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
@@ -46,10 +63,29 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
keepalive_ticker.reset();
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
let batches_stream =
tokio_stream::wrappers::ReceiverStream::new(rx).then(|batch| async move {
let buf: Result<Bytes, CopyStreamHandlerEnd> = batch
.records
.to_wire(self.format, self.compression)
.await
.with_context(|| "Failed to serialize interpreted WAL")
.map_err(CopyStreamHandlerEnd::from);
Result::<_, CopyStreamHandlerEnd>::Ok(SerializedBatch {
wal_end_lsn: batch.wal_end_lsn,
available_wal_end_lsn: batch.available_wal_end_lsn,
buf: buf?,
})
});
let mut batches_stream = std::pin::pin!(batches_stream);
loop {
tokio::select! {
// Get some WAL from the stream and then: decode, interpret and send it
wal = stream.next() => {
// Get some WAL from the stream and then: decode, interpret and push it down the
// pipeline.
wal = stream.next(), if tx.capacity() > 0 => {
let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
Some(some) => some?,
None => { break; }
@@ -81,10 +117,19 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
}
}
let mut buf = Vec::new();
records
.ser_into(&mut buf)
.with_context(|| "Failed to serialize interpreted WAL")?;
let batch = InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn
};
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
},
// For a previously interpreted batch, serialize it and push it down the wire.
encoded_batch = batches_stream.next() => {
let SerializedBatch {wal_end_lsn, available_wal_end_lsn, buf } = match encoded_batch {
Some(ser_batch) => ser_batch?,
None => { break; }
};
// Reset the keep alive ticker since we are sending something
// over the wire now.
@@ -94,11 +139,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
streaming_lsn: wal_end_lsn.0,
commit_lsn: available_wal_end_lsn.0,
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
data: buf.as_slice(),
data: &buf,
})).await?;
}
// Send a periodic keep alive when the connection has been idle for a while.
_ = keepalive_ticker.tick() => {
self.pgb

View File

@@ -454,7 +454,7 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_)),
@@ -489,7 +489,10 @@ impl SafekeeperPostgresHandler {
Either::Left(sender.run())
}
PostgresClientProtocol::Interpreted => {
PostgresClientProtocol::Interpreted {
format,
compression,
} => {
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
let end_watch_view = end_watch.view();
let wal_stream_builder = WalReaderStreamBuilder {
@@ -502,6 +505,8 @@ impl SafekeeperPostgresHandler {
};
let sender = InterpretedWalSender {
format,
compression,
pgb,
wal_stream_builder,
end_watch_view,

View File

@@ -310,6 +310,31 @@ class PgProtocol:
return self.safe_psql(query, log_query=log_query)[0][0]
class PageserverWalReceiverProtocol(StrEnum):
VANILLA = "vanilla"
INTERPRETED = "interpreted"
@staticmethod
def to_config_key_value(proto) -> tuple[str, dict[str, Any]]:
if proto == PageserverWalReceiverProtocol.VANILLA:
return (
"wal_receiver_protocol",
{
"type": "vanilla",
},
)
elif proto == PageserverWalReceiverProtocol.INTERPRETED:
return (
"wal_receiver_protocol",
{
"type": "interpreted",
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
},
)
else:
raise ValueError(f"Unknown protocol type: {proto}")
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -356,6 +381,7 @@ class NeonEnvBuilder:
safekeeper_extra_opts: list[str] | None = None,
storage_controller_port_override: int | None = None,
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -409,6 +435,8 @@ class NeonEnvBuilder:
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1023,6 +1051,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1092,6 +1121,13 @@ class NeonEnv:
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
self.pageserver_wal_receiver_protocol
)
if key not in ps_cfg:
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])

View File

@@ -15,7 +15,14 @@ from fixtures.neon_fixtures import (
@pytest.mark.timeout(600)
@pytest.mark.parametrize("shard_count", [1, 8, 32])
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
@pytest.mark.parametrize(
"wal_receiver_protocol",
[
"vanilla",
"interpreted-bincode-compressed",
"interpreted-protobuf-compressed",
],
)
def test_sharded_ingest(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
@@ -27,14 +34,42 @@ def test_sharded_ingest(
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
(shard_count=1) to the sharded case indicates the overhead of sharding.
"""
neon_env_builder.pageserver_config_override = (
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
)
ROW_COUNT = 100_000_000 # about 7 GB of WAL
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
env = neon_env_builder.init_configs()
for ps in env.pageservers:
if wal_receiver_protocol == "vanilla":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "vanilla",
}
}
)
elif wal_receiver_protocol == "interpreted-bincode-compressed":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "interpreted",
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
}
}
)
elif wal_receiver_protocol == "interpreted-protobuf-compressed":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "interpreted",
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
}
}
)
else:
raise AssertionError("Test must use explicit wal receiver protocol config")
env.start()
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
# the storage controller doesn't mess with shard placements.

View File

@@ -8,6 +8,7 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverWalReceiverProtocol,
generate_uploads_and_deletions,
)
from fixtures.pageserver.http import PageserverApiException
@@ -27,8 +28,13 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
@skip_in_debug_build("only run with release build")
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
@pytest.mark.parametrize(
"wal_receiver_protocol",
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_pageserver_compaction_smoke(
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
):
"""
This is a smoke test that compaction kicks in. The workload repeatedly churns
a small number of rows and manually instructs the pageserver to run compaction
@@ -37,10 +43,12 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei
observed bounds.
"""
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
# Effectively disable the page cache to rely only on image layers
# to shorten reads.
neon_env_builder.pageserver_config_override = f"""
page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}'
neon_env_builder.pageserver_config_override = """
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import pytest
from fixtures.log_helper import log
from fixtures.neon_cli import WalCraft
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol
# Restart nodes with WAL end having specially crafted shape, like last record
# crossing segment boundary, to test decoding issues.
@@ -19,13 +19,16 @@ from fixtures.neon_fixtures import NeonEnvBuilder
"wal_record_crossing_segment_followed_by_small_one",
],
)
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
@pytest.mark.parametrize(
"wal_receiver_protocol",
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_crafted_wal_end(
neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str
neon_env_builder: NeonEnvBuilder,
wal_type: str,
wal_receiver_protocol: PageserverWalReceiverProtocol,
):
neon_env_builder.pageserver_config_override = (
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
)
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
env = neon_env_builder.init_start()
env.create_branch("test_crafted_wal_end")

View File

@@ -1,7 +1,11 @@
from __future__ import annotations
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverWalReceiverProtocol,
check_restored_datadir_content,
)
# Test subtransactions
@@ -10,11 +14,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_conten
# maintained in the pageserver, so subtransactions are not very exciting for
# Neon. They are included in the commit record though and updated in the
# CLOG.
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
@pytest.mark.parametrize(
"wal_receiver_protocol",
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol):
neon_env_builder.pageserver_config_override = (
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
)
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")

View File

@@ -11,7 +11,13 @@ import pytest
import toml
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PageserverWalReceiverProtocol,
Safekeeper,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import skip_in_debug_build
@@ -622,12 +628,15 @@ async def run_segment_init_failure(env: NeonEnv):
# Test (injected) failure during WAL segment init.
# https://github.com/neondatabase/neon/issues/6401
# https://github.com/neondatabase/neon/issues/6402
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
@pytest.mark.parametrize(
"wal_receiver_protocol",
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
)
def test_segment_init_failure(
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
):
neon_env_builder.num_safekeepers = 1
neon_env_builder.pageserver_config_override = (
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
)
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
env = neon_env_builder.init_start()
asyncio.run(run_segment_init_failure(env))

View File

@@ -60,7 +60,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted", default-features = false, features = ["with-serde_json-1"] }
prost = { version = "0.13", features = ["prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
@@ -80,7 +80,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/pack-interpreted", features = ["with-serde_json-1"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }