From 6009d9fe4df0f23edcda6eeafff45d444c95b211 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 19 Nov 2024 19:41:21 +0100 Subject: [PATCH] wal_decoder: add protobuf support The bulk of this commit is glue code to convert between protobuf representation and in-memory representation. --- libs/pageserver_api/src/key.rs | 12 ++ libs/utils/src/postgres_client.rs | 1 + libs/wal_decoder/src/wire_format.rs | 264 +++++++++++++++++++++++++++- 3 files changed, 272 insertions(+), 5 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 4505101ea6..523d143381 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -229,6 +229,18 @@ impl Key { } } +impl CompactKey { + pub fn raw(&self) -> i128 { + self.0 + } +} + +impl From for CompactKey { + fn from(value: i128) -> Self { + Self(value) + } +} + impl fmt::Display for Key { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index 3ed08a40ee..3a1a51d876 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -11,6 +11,7 @@ use crate::id::TenantTimelineId; #[serde(rename_all = "kebab-case")] pub enum InterpretedFormat { Bincode, + Protobuf, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs index 0e6f6676b4..531f492bb5 100644 --- a/libs/wal_decoder/src/wire_format.rs +++ b/libs/wal_decoder/src/wire_format.rs @@ -1,19 +1,63 @@ use bytes::{BufMut, Bytes, BytesMut}; +use pageserver_api::key::CompactKey; +use prost::{DecodeError, EncodeError, Message}; use utils::bin_ser::{BeSer, DeserializeError, SerializeError}; +use utils::lsn::Lsn; use utils::postgres_client::InterpretedFormat; -use crate::models::InterpretedWalRecords; +use crate::models::{ + FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, +}; + +use crate::serialized_batch::{ + ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta, +}; + +use crate::models::proto::CompactKey as ProtoCompactKey; +use crate::models::proto::InterpretedWalRecord as ProtoInterpretedWalRecord; +use crate::models::proto::InterpretedWalRecords as ProtoInterpretedWalRecords; +use crate::models::proto::SerializedValueBatch as ProtoSerializedValueBatch; +use crate::models::proto::ValueMeta as ProtoValueMeta; +use crate::models::proto::ValueMetaType as ProtoValueMetaType; #[derive(Debug, thiserror::Error)] pub enum ToWireFormatError { #[error("{0}")] - Bincode(SerializeError), + Bincode(#[from] SerializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufSerializeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufSerializeError { + #[error("{0}")] + MetadataRecord(#[from] SerializeError), + #[error("{0}")] + Encode(#[from] EncodeError), } #[derive(Debug, thiserror::Error)] pub enum FromWireFormatError { #[error("{0}")] - Bincode(DeserializeError), + Bincode(#[from] DeserializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufDeserializeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufDeserializeError { + #[error("{0}")] + Transcode(#[from] TranscodeError), + #[error("{0}")] + Decode(#[from] DecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum TranscodeError { + #[error("{0}")] + BadInput(String), + #[error("{0}")] + MetadataRecord(#[from] DeserializeError), } pub trait ToWireFormat { @@ -31,10 +75,18 @@ impl ToWireFormat for InterpretedWalRecords { InterpretedFormat::Bincode => { let buf = BytesMut::new(); let mut buf = buf.writer(); - self.ser_into(&mut buf) - .map_err(ToWireFormatError::Bincode)?; + self.ser_into(&mut buf)?; Ok(buf.into_inner().freeze()) } + InterpretedFormat::Protobuf => { + let proto: ProtoInterpretedWalRecords = self.try_into()?; + let mut buf = BytesMut::new(); + proto + .encode(&mut buf) + .map_err(|e| ToWireFormatError::Protobuf(e.into()))?; + + Ok(buf.freeze()) + } } } } @@ -47,6 +99,208 @@ impl FromWireFormat for InterpretedWalRecords { InterpretedFormat::Bincode => { InterpretedWalRecords::des(buf).map_err(FromWireFormatError::Bincode) } + InterpretedFormat::Protobuf => { + let proto = ProtoInterpretedWalRecords::decode(buf.clone()) + .map_err(|e| FromWireFormatError::Protobuf(e.into()))?; + InterpretedWalRecords::try_from(proto) + .map_err(|e| FromWireFormatError::Protobuf(e.into())) + } } } } + +impl TryFrom for ProtoInterpretedWalRecords { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(ProtoInterpretedWalRecord::try_from) + .collect::, _>>()?; + Ok(ProtoInterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(|l| l.0), + }) + } +} + +impl TryFrom for ProtoInterpretedWalRecord { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|meta_rec| -> Result, Self::Error> { + let mut buf = Vec::new(); + meta_rec.ser_into(&mut buf)?; + Ok(buf) + }) + .transpose()?; + + Ok(ProtoInterpretedWalRecord { + metadata_record, + batch: Some(ProtoSerializedValueBatch::from(value.batch)), + next_record_lsn: value.next_record_lsn.0, + flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes), + xid: value.xid, + }) + } +} + +impl From for ProtoSerializedValueBatch { + fn from(value: SerializedValueBatch) -> Self { + ProtoSerializedValueBatch { + raw: value.raw, + metadata: value + .metadata + .into_iter() + .map(ProtoValueMeta::from) + .collect(), + max_lsn: value.max_lsn.0, + len: value.len as u64, + } + } +} + +impl From for ProtoValueMeta { + fn from(value: ValueMeta) -> Self { + match value { + ValueMeta::Observed(obs) => ProtoValueMeta { + r#type: ProtoValueMetaType::Observed.into(), + key: Some(ProtoCompactKey::from(obs.key)), + lsn: obs.lsn.0, + batch_offset: None, + len: None, + will_init: None, + }, + ValueMeta::Serialized(ser) => ProtoValueMeta { + r#type: ProtoValueMetaType::Serialized.into(), + key: Some(ProtoCompactKey::from(ser.key)), + lsn: ser.lsn.0, + batch_offset: Some(ser.batch_offset), + len: Some(ser.len as u64), + will_init: Some(ser.will_init), + }, + } + } +} + +impl From for ProtoCompactKey { + fn from(value: CompactKey) -> Self { + ProtoCompactKey { + high: (value.raw() >> 64) as i64, + low: value.raw() as i64, + } + } +} + +impl TryFrom for InterpretedWalRecords { + type Error = TranscodeError; + + fn try_from(value: ProtoInterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(InterpretedWalRecord::try_from) + .collect::>()?; + + Ok(InterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(Lsn::from), + }) + } +} + +impl TryFrom for InterpretedWalRecord { + type Error = TranscodeError; + + fn try_from(value: ProtoInterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) }) + .transpose()?; + + let batch = { + let batch = value.batch.ok_or_else(|| { + TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string()) + })?; + + SerializedValueBatch::try_from(batch)? + }; + + Ok(InterpretedWalRecord { + metadata_record, + batch, + next_record_lsn: Lsn(value.next_record_lsn), + flush_uncommitted: if value.flush_uncommitted { + FlushUncommittedRecords::Yes + } else { + FlushUncommittedRecords::No + }, + xid: value.xid, + }) + } +} + +impl TryFrom for SerializedValueBatch { + type Error = TranscodeError; + + fn try_from(value: ProtoSerializedValueBatch) -> Result { + let metadata = value + .metadata + .into_iter() + .map(ValueMeta::try_from) + .collect::, _>>()?; + + Ok(SerializedValueBatch { + raw: value.raw, + metadata, + max_lsn: Lsn(value.max_lsn), + len: value.len as usize, + }) + } +} + +impl TryFrom for ValueMeta { + type Error = TranscodeError; + + fn try_from(value: ProtoValueMeta) -> Result { + match ProtoValueMetaType::try_from(value.r#type) { + Ok(ProtoValueMetaType::Serialized) => Ok(ValueMeta::Serialized(SerializedValueMeta { + key: value + .key + .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))? + .into(), + lsn: Lsn(value.lsn), + batch_offset: value.batch_offset.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string()) + })?, + len: value + .len + .ok_or_else(|| TranscodeError::BadInput("ValueMeta::len missing".to_string()))? + as usize, + will_init: value.will_init.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::will_init missing".to_string()) + })?, + })), + Ok(ProtoValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta { + key: value + .key + .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))? + .into(), + lsn: Lsn(value.lsn), + })), + Err(_) => Err(TranscodeError::BadInput(format!( + "Unexpected ValueMeta::type {}", + value.r#type + ))), + } + } +} + +impl From for CompactKey { + fn from(value: ProtoCompactKey) -> Self { + (((value.high as i128) << 64) | (value.low as i128)).into() + } +}