This commit is contained in:
Vlad Lazar
2024-11-06 19:41:22 +01:00
parent f0044b8651
commit b3ef315041
5 changed files with 97 additions and 14 deletions

View File

@@ -562,7 +562,7 @@ pub enum BeMessage<'a> {
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
InterpretedWalRecord(&'a [u8]),
InterpretedWalRecord(InterpretedWalRecordBody<'a>),
}
/// Common shorthands.
@@ -666,6 +666,12 @@ pub struct XLogDataBody<'a> {
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct InterpretedWalRecordBody<'a> {
pub wal_end: u64,
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub wal_end: u64, // current end of WAL on the server
@@ -1002,7 +1008,8 @@ impl BeMessage<'_> {
buf.put_u8(b'd'); // arbitrary?
write_body(buf, |buf| {
buf.put_u8(b'0');
buf.put_slice(rec);
buf.put_u64(rec.wal_end);
buf.put_slice(rec.data);
});
}
}

View File

@@ -1164,7 +1164,7 @@ impl<'a> DatadirModification<'a> {
.get_rel_exists(rel, Version::Modified(self), ctx)
.await?
{
tracing::info!("Creating relation {rel:?} at lsn {}", self.get_lsn());
tracing::debug!("Creating relation {rel:?} at lsn {}", self.get_lsn());
// create it with 0 size initially, the logic below will extend it
self.put_rel_creation(rel, 0, ctx)
@@ -1172,7 +1172,7 @@ impl<'a> DatadirModification<'a> {
.context("Relation Error")?;
Ok(0)
} else {
tracing::info!(
tracing::debug!(
"Skipping relation {rel:?} creation at lsn {}",
self.get_lsn()
);
@@ -1217,7 +1217,7 @@ impl<'a> DatadirModification<'a> {
shard: &ShardIdentity,
ctx: &RequestContext,
) -> anyhow::Result<()> {
tracing::info!("Ingesting batch with metadata: {:?}", batch.metadata);
tracing::debug!("Ingesting batch with metadata: {:?}", batch.metadata);
let mut gaps_at_lsns = Vec::default();

View File

@@ -37,7 +37,7 @@ use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::{
wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION,
wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION,
};
use utils::{
id::{NodeId, TenantTimelineId},
@@ -998,6 +998,16 @@ impl ConnectionManagerState {
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
};
// let connection_conf_args = ConnectionConfigArgs {
// protocol_version: POSTGRES_PROTO_VERSION,
// ttid: self.id,
// shard_number: None,
// shard_count: None,
// shard_stripe_size: None,
// listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
// auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
// availability_zone: self.conf.availability_zone.as_deref()
// };
match wal_stream_connection_config(connection_conf_args) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {

View File

@@ -300,8 +300,12 @@ pub(super) async fn handle_walreceiver_connection(
}
let status_update = match replication_message {
ReplicationMessage::RawInterpretedWalRecord(raw_interpreted) => {
let interpreted = InterpretedWalRecord::des(&raw_interpreted).unwrap();
ReplicationMessage::RawInterpretedWalRecord(raw) => {
connection_status.latest_connection_update = now;
connection_status.latest_wal_update = now;
connection_status.commit_lsn = Some(Lsn::from(raw.wal_end()));
let interpreted = InterpretedWalRecord::des(raw.data()).unwrap();
let end_lsn = interpreted.end_lsn;
let mut modification = timeline.begin_modification(end_lsn);

View File

@@ -18,7 +18,7 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use pq_proto::{BeMessage, InterpretedWalRecordBody, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
@@ -591,6 +591,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
&mut self,
shard: &ShardIdentity,
) -> Result<(), CopyStreamHandlerEnd> {
let mut last_logged_at = std::time::Instant::now();
let mut interpreted_records = 0;
let mut interpreted_bytes = 0;
let mut useful_bytes = 0;
let pg_version = self.tli.tli.get_state().await.1.server.pg_version / 10000;
let mut wal_decoder = WalStreamDecoder::new(self.start_pos, pg_version);
@@ -636,8 +641,16 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
wal_decoder.feed_bytes(send_buf);
// TODO(vlad): implement error handling here
while let Some((record_end_lsn, recdata)) = wal_decoder.poll_decode().unwrap() {
// How fast or slow is this. Write a little benchmark
// to see how quiclky we can decode 1GiB of WAL.
// If this is slow, then we have a problem since it bottlenecks
// the whole afair. SK can send about 60-70MiB of raw WAL and
// about 13-17MiB of useful interpreted WAL per second (these
// number are for one shard).
while let Some((record_end_lsn, recdata)) = wal_decoder
.poll_decode()
.with_context(|| "Failed to decode WAL")?
{
assert!(record_end_lsn.is_aligned());
// Deserialize and interpret WAL record
@@ -647,14 +660,27 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
record_end_lsn,
pg_version,
)
.unwrap();
.with_context(|| "Failed to interpret WAL")?;
let useful_size = interpreted.batch.buffer_size();
let mut buf = Vec::new();
interpreted.ser_into(&mut buf).unwrap();
interpreted
.ser_into(&mut buf)
.with_context(|| "Failed to serialize interpreted WAL")?;
let size = buf.len();
self.pgb
.write_message(&BeMessage::InterpretedWalRecord(buf.as_slice()))
.write_message(&BeMessage::InterpretedWalRecord(InterpretedWalRecordBody {
wal_end: self.end_pos.0,
data: buf.as_slice(),
}))
.await?;
interpreted_records += 1;
interpreted_bytes += size;
useful_bytes += useful_size;
}
// and send it
@@ -680,10 +706,32 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
// );
self.start_pos += send_size as u64;
let elapsed = last_logged_at.elapsed();
if elapsed >= Duration::from_secs(5) {
let records_rate = interpreted_records / elapsed.as_millis() * 1000;
let bytes_rate = interpreted_bytes / elapsed.as_millis() as usize * 1000;
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
tracing::info!(
"Shard {} sender rate: rps={} bps={} ubps={}",
shard.number.0,
records_rate,
bytes_rate,
useful_bytes_rate
);
last_logged_at = std::time::Instant::now();
interpreted_records = 0;
interpreted_bytes = 0;
useful_bytes = 0;
}
}
}
async fn run_wal_sender(&mut self) -> Result<(), CopyStreamHandlerEnd> {
let mut useful_bytes = 0;
let mut last_logged_at = std::time::Instant::now();
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
@@ -724,6 +772,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
};
let send_buf = &send_buf[..send_size];
useful_bytes += send_buf.len();
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
@@ -746,6 +796,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
self.start_pos + send_size as u64
);
self.start_pos += send_size as u64;
let elapsed = last_logged_at.elapsed();
if elapsed >= Duration::from_secs(5) {
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
tracing::info!(
"Sender rate: ubps={}",
useful_bytes_rate
);
last_logged_at = std::time::Instant::now();
useful_bytes = 0;
}
}
}