mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
safekeeper: use protobuf for sending compressed records to pageserver (#9821)
## Problem https://github.com/neondatabase/neon/pull/9746 lifted decoding and interpretation of WAL to the safekeeper. This reduced the ingested amount on the pageservers by around 10x for a tenant with 8 shards, but doubled the ingested amount for single sharded tenants. Also, https://github.com/neondatabase/neon/pull/9746 uses bincode which doesn't support schema evolution. Technically the schema can be evolved, but it's very cumbersome. ## Summary of changes This patch set addresses both problems by adding protobuf support for the interpreted wal records and adding compression support. Compressed protobuf reduced the ingested amount by 100x on the 32 shards `test_sharded_ingest` case (compared to non-interpreted proto). For the 1 shard case the reduction is 5x. Sister change to `rust-postgres` is [here](https://github.com/neondatabase/rust-postgres/pull/33). ## Links Related: https://github.com/neondatabase/neon/issues/9336 Epic: https://github.com/neondatabase/neon/issues/9329
This commit is contained in:
@@ -535,6 +535,7 @@ impl ConnectionManagerState {
|
||||
let node_id = new_sk.safekeeper_id;
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let protocol = self.conf.protocol;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -548,6 +549,7 @@ impl ConnectionManagerState {
|
||||
|
||||
let res = super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
protocol,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
cancellation.clone(),
|
||||
@@ -991,7 +993,7 @@ impl ConnectionManagerState {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
PostgresClientProtocol::Interpreted { .. } => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
|
||||
@@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
|
||||
use wal_decoder::{
|
||||
models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords},
|
||||
wire_format::FromWireFormat,
|
||||
};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
@@ -36,7 +39,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -109,6 +112,7 @@ impl From<WalDecodeError> for WalReceiverError {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn handle_walreceiver_connection(
|
||||
timeline: Arc<Timeline>,
|
||||
protocol: PostgresClientProtocol,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
cancellation: CancellationToken,
|
||||
@@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
|
||||
|
||||
let interpreted_proto_config = match protocol {
|
||||
PostgresClientProtocol::Vanilla => None,
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
compression,
|
||||
} => Some((format, compression)),
|
||||
};
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
@@ -332,16 +344,26 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
tracing::debug!(
|
||||
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
|
||||
Lsn(raw.next_record_lsn().unwrap_or(0))
|
||||
);
|
||||
|
||||
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
let (format, compression) = interpreted_proto_config.unwrap();
|
||||
let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
|
||||
.await
|
||||
.with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
})?;
|
||||
|
||||
let InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn,
|
||||
} = batch;
|
||||
|
||||
tracing::debug!(
|
||||
"Received WAL up to {} with next_record_lsn={:?}",
|
||||
streaming_lsn,
|
||||
next_record_lsn
|
||||
);
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
@@ -360,14 +382,18 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.await?;
|
||||
}
|
||||
|
||||
let next_record_lsn = interpreted.next_record_lsn;
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {local_next_record_lsn}")
|
||||
})?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
tracing::debug!(
|
||||
"ingest: filtered out record @ LSN {local_next_record_lsn}"
|
||||
);
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
@@ -399,7 +425,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
|
||||
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
|
||||
Reference in New Issue
Block a user