diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index a5933e9427..40f21436f1 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -562,7 +562,7 @@ pub enum BeMessage<'a> { options: &'a [&'a str], }, KeepAlive(WalSndKeepAlive), - InterpretedWalRecord(&'a [u8]), + InterpretedWalRecord(InterpretedWalRecordBody<'a>), } /// Common shorthands. @@ -666,6 +666,12 @@ pub struct XLogDataBody<'a> { pub data: &'a [u8], } +#[derive(Debug)] +pub struct InterpretedWalRecordBody<'a> { + pub wal_end: u64, + pub data: &'a [u8], +} + #[derive(Debug)] pub struct WalSndKeepAlive { pub wal_end: u64, // current end of WAL on the server @@ -1002,7 +1008,8 @@ impl BeMessage<'_> { buf.put_u8(b'd'); // arbitrary? write_body(buf, |buf| { buf.put_u8(b'0'); - buf.put_slice(rec); + buf.put_u64(rec.wal_end); + buf.put_slice(rec.data); }); } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c8ae320b5f..89081db7f7 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1164,7 +1164,7 @@ impl<'a> DatadirModification<'a> { .get_rel_exists(rel, Version::Modified(self), ctx) .await? { - tracing::info!("Creating relation {rel:?} at lsn {}", self.get_lsn()); + tracing::debug!("Creating relation {rel:?} at lsn {}", self.get_lsn()); // create it with 0 size initially, the logic below will extend it self.put_rel_creation(rel, 0, ctx) @@ -1172,7 +1172,7 @@ impl<'a> DatadirModification<'a> { .context("Relation Error")?; Ok(0) } else { - tracing::info!( + tracing::debug!( "Skipping relation {rel:?} creation at lsn {}", self.get_lsn() ); @@ -1217,7 +1217,7 @@ impl<'a> DatadirModification<'a> { shard: &ShardIdentity, ctx: &RequestContext, ) -> anyhow::Result<()> { - tracing::info!("Ingesting batch with metadata: {:?}", batch.metadata); + tracing::debug!("Ingesting batch with metadata: {:?}", batch.metadata); let mut gaps_at_lsns = Vec::default(); diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 4d6a97ffb8..a2defd0ef9 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -37,7 +37,7 @@ use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; use utils::postgres_client::{ - wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, + wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION, }; use utils::{ id::{NodeId, TenantTimelineId}, @@ -998,6 +998,16 @@ impl ConnectionManagerState { auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), availability_zone: self.conf.availability_zone.as_deref() }; + // let connection_conf_args = ConnectionConfigArgs { + // protocol_version: POSTGRES_PROTO_VERSION, + // ttid: self.id, + // shard_number: None, + // shard_count: None, + // shard_stripe_size: None, + // listen_pg_addr_str: info.safekeeper_connstr.as_ref(), + // auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), + // availability_zone: self.conf.availability_zone.as_deref() + // }; match wal_stream_connection_config(connection_conf_args) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 11dc4e9a12..b97d646f3d 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -300,8 +300,12 @@ pub(super) async fn handle_walreceiver_connection( } let status_update = match replication_message { - ReplicationMessage::RawInterpretedWalRecord(raw_interpreted) => { - let interpreted = InterpretedWalRecord::des(&raw_interpreted).unwrap(); + ReplicationMessage::RawInterpretedWalRecord(raw) => { + connection_status.latest_connection_update = now; + connection_status.latest_wal_update = now; + connection_status.commit_lsn = Some(Lsn::from(raw.wal_end())); + + let interpreted = InterpretedWalRecord::des(raw.data()).unwrap(); let end_lsn = interpreted.end_lsn; let mut modification = timeline.begin_modification(end_lsn); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 9b3c5bd779..69025b3525 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -18,7 +18,7 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; -use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; +use pq_proto::{BeMessage, InterpretedWalRecordBody, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; @@ -591,6 +591,11 @@ impl WalSender<'_, IO> { &mut self, shard: &ShardIdentity, ) -> Result<(), CopyStreamHandlerEnd> { + let mut last_logged_at = std::time::Instant::now(); + let mut interpreted_records = 0; + let mut interpreted_bytes = 0; + let mut useful_bytes = 0; + let pg_version = self.tli.tli.get_state().await.1.server.pg_version / 10000; let mut wal_decoder = WalStreamDecoder::new(self.start_pos, pg_version); @@ -636,8 +641,16 @@ impl WalSender<'_, IO> { wal_decoder.feed_bytes(send_buf); - // TODO(vlad): implement error handling here - while let Some((record_end_lsn, recdata)) = wal_decoder.poll_decode().unwrap() { + // How fast or slow is this. Write a little benchmark + // to see how quiclky we can decode 1GiB of WAL. + // If this is slow, then we have a problem since it bottlenecks + // the whole afair. SK can send about 60-70MiB of raw WAL and + // about 13-17MiB of useful interpreted WAL per second (these + // number are for one shard). + while let Some((record_end_lsn, recdata)) = wal_decoder + .poll_decode() + .with_context(|| "Failed to decode WAL")? + { assert!(record_end_lsn.is_aligned()); // Deserialize and interpret WAL record @@ -647,14 +660,27 @@ impl WalSender<'_, IO> { record_end_lsn, pg_version, ) - .unwrap(); + .with_context(|| "Failed to interpret WAL")?; + + let useful_size = interpreted.batch.buffer_size(); let mut buf = Vec::new(); - interpreted.ser_into(&mut buf).unwrap(); + interpreted + .ser_into(&mut buf) + .with_context(|| "Failed to serialize interpreted WAL")?; + + let size = buf.len(); self.pgb - .write_message(&BeMessage::InterpretedWalRecord(buf.as_slice())) + .write_message(&BeMessage::InterpretedWalRecord(InterpretedWalRecordBody { + wal_end: self.end_pos.0, + data: buf.as_slice(), + })) .await?; + + interpreted_records += 1; + interpreted_bytes += size; + useful_bytes += useful_size; } // and send it @@ -680,10 +706,32 @@ impl WalSender<'_, IO> { // ); self.start_pos += send_size as u64; + + let elapsed = last_logged_at.elapsed(); + if elapsed >= Duration::from_secs(5) { + let records_rate = interpreted_records / elapsed.as_millis() * 1000; + let bytes_rate = interpreted_bytes / elapsed.as_millis() as usize * 1000; + let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000; + tracing::info!( + "Shard {} sender rate: rps={} bps={} ubps={}", + shard.number.0, + records_rate, + bytes_rate, + useful_bytes_rate + ); + + last_logged_at = std::time::Instant::now(); + interpreted_records = 0; + interpreted_bytes = 0; + useful_bytes = 0; + } } } async fn run_wal_sender(&mut self) -> Result<(), CopyStreamHandlerEnd> { + let mut useful_bytes = 0; + let mut last_logged_at = std::time::Instant::now(); + loop { // Wait for the next portion if it is not there yet, or just // update our end of WAL available for sending value, we @@ -724,6 +772,8 @@ impl WalSender<'_, IO> { }; let send_buf = &send_buf[..send_size]; + useful_bytes += send_buf.len(); + // and send it self.pgb .write_message(&BeMessage::XLogData(XLogDataBody { @@ -746,6 +796,18 @@ impl WalSender<'_, IO> { self.start_pos + send_size as u64 ); self.start_pos += send_size as u64; + + let elapsed = last_logged_at.elapsed(); + if elapsed >= Duration::from_secs(5) { + let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000; + tracing::info!( + "Sender rate: ubps={}", + useful_bytes_rate + ); + + last_logged_at = std::time::Instant::now(); + useful_bytes = 0; + } } }