Compare commits

...

18 Commits

Author SHA1 Message Date
Vlad Lazar
d0d34e1ebc pageserver: use interpreted wal proto by default 2024-11-18 12:13:39 +01:00
Vlad Lazar
8c4deeed1d review: pass buffer size as argument to wal reader 2024-11-18 12:09:11 +01:00
Vlad Lazar
c3701890d6 wip: update rust-postgres 2024-11-18 12:06:38 +01:00
Vlad Lazar
1c8f4938cb review: rename to commit LSN to commit_lsn 2024-11-18 12:06:12 +01:00
Vlad Lazar
dc220e3f18 review: make shard param parsing more strict 2024-11-18 11:55:12 +01:00
Vlad Lazar
27eb5e6a19 review: add COPY_DATA_TAG mention 2024-11-18 11:26:07 +01:00
Vlad Lazar
901b8e4967 wip: set wip tokio-postgres branch 2024-11-18 11:21:45 +01:00
Vlad Lazar
a19dd5ae73 tests: measure received WAL and records for sharded ingest 2024-11-18 11:19:59 +01:00
Vlad Lazar
dceab62789 tests: parametrize sharded ingest test on protocol 2024-11-18 11:19:59 +01:00
Vlad Lazar
e503db2d45 pageserver: gate new protocol with pageserver config 2024-11-18 11:19:59 +01:00
Vlad Lazar
3b4ffaf79a pageserver: ingest interpreted WAL records 2024-11-18 11:19:58 +01:00
Vlad Lazar
4f3583a316 safekeeper: dispatch wal sending based on protocol 2024-11-18 11:16:55 +01:00
Vlad Lazar
3cef53c5c0 safekeeper: add an interpreted wal sender 2024-11-18 11:16:55 +01:00
Vlad Lazar
1f043f480a libs/pq_proto: add interpreted wal records type 2024-11-18 11:16:54 +01:00
Vlad Lazar
057138f065 safekeeper: abstract WAL reading into a stream 2024-11-18 11:16:54 +01:00
Vlad Lazar
260127ebf3 safekeeper: parse new replication connection arguments 2024-11-18 11:16:54 +01:00
Vlad Lazar
d64c60d9d9 util: allow for different protocol flavours in PG connections 2024-11-18 11:16:54 +01:00
Vlad Lazar
d5d2e268c5 wal_decoder: add an is empty method for interpreted record 2024-11-18 11:16:54 +01:00
24 changed files with 839 additions and 84 deletions

11
Cargo.lock generated
View File

@@ -4009,7 +4009,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4022,7 +4022,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4041,7 +4041,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
dependencies = [
"bytes",
"fallible-iterator",
@@ -5161,6 +5161,7 @@ dependencies = [
"itertools 0.10.5",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
@@ -5191,6 +5192,7 @@ dependencies = [
"tracing-subscriber",
"url",
"utils",
"wal_decoder",
"walproposer",
"workspace_hack",
]
@@ -6227,7 +6229,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
dependencies = [
"async-trait",
"byteorder",
@@ -6781,6 +6783,7 @@ dependencies = [
"serde_assert",
"serde_json",
"serde_path_to_error",
"serde_with",
"signal-hook",
"strum",
"strum_macros",

View File

@@ -203,10 +203,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -244,7 +244,7 @@ tonic-build = "0.12"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
################# Binary contents sections

View File

@@ -18,7 +18,7 @@ use std::{
str::FromStr,
time::Duration,
};
use utils::logging::LogFormat;
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
use crate::models::ImageCompressionAlgorithm;
use crate::models::LsnLease;
@@ -109,6 +109,7 @@ pub struct ConfigToml {
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_sync: Option<bool>,
pub wal_receiver_protocol: PostgresClientProtocol,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -317,6 +318,9 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
utils::postgres_client::PostgresClientProtocol::Interpreted;
}
impl Default for ConfigToml {
@@ -399,6 +403,7 @@ impl Default for ConfigToml {
virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
no_sync: None,
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
}
}
}

View File

@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
/// Batch of interpreted, shard filtered WAL records,
/// ready for the pageserver to ingest
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
}
/// Common shorthands.
@@ -672,6 +675,18 @@ pub struct WalSndKeepAlive {
pub request_reply: bool,
}
#[derive(Debug)]
pub struct InterpretedWalRecordsBody<'a> {
/// End of raw WAL in [`Self::data`]
pub streaming_lsn: u64,
/// Current end of WAL on the server
pub commit_lsn: u64,
/// Start LSN of the next record in PG WAL.
/// Is 0 if the portion of PG WAL did not contain any records.
pub next_record_lsn: u64,
pub data: &'a [u8],
}
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
// single text column
@@ -996,6 +1011,20 @@ impl BeMessage<'_> {
Ok(())
})?
}
BeMessage::InterpretedWalRecords(rec) => {
// We use the COPY_DATA_TAG for our custom message
// since this tag is interpreted as raw bytes.
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_u64(rec.next_record_lsn);
buf.put_slice(rec.data);
});
}
}
Ok(())
}

View File

@@ -32,6 +32,7 @@ pin-project-lite.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true

View File

@@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Postgres client protocol types
#[derive(
Copy,
Clone,
PartialEq,
Eq,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum PostgresClientProtocol {
/// Usual Postgres replication protocol
Vanilla,
/// Custom shard-aware protocol that replicates interpreted records.
/// Used to send wal from safekeeper to pageserver.
Interpreted,
}
impl TryFrom<u8> for PostgresClientProtocol {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
v if v == (PostgresClientProtocol::Interpreted as u8) => {
PostgresClientProtocol::Interpreted
}
x => return Err(x),
})
}
}
pub struct ConnectionConfigArgs<'a> {
pub protocol: PostgresClientProtocol,
pub ttid: TenantTimelineId,
pub shard_number: Option<u8>,
pub shard_count: Option<u8>,
pub shard_stripe_size: Option<u32>,
pub listen_pg_addr_str: &'a str,
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
}
impl<'a> ConnectionConfigArgs<'a> {
fn options(&'a self) -> Vec<String> {
let mut options = vec![
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!("protocol={}", self.protocol as u8),
];
if self.shard_number.is_some() {
assert!(self.shard_count.is_some());
assert!(self.shard_stripe_size.is_some());
options.push(format!("shard_count={}", self.shard_count.unwrap()));
options.push(format!("shard_number={}", self.shard_number.unwrap()));
options.push(format!(
"shard_stripe_size={}",
self.shard_stripe_size.unwrap()
));
}
options
}
}
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
args: ConnectionConfigArgs,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
.extend_options(args.options())
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}

View File

@@ -65,6 +65,18 @@ pub struct InterpretedWalRecord {
pub xid: TransactionId,
}
impl InterpretedWalRecord {
/// Checks if the WAL record is empty
///
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
/// pageserver.
pub fn is_empty(&self) -> bool {
self.batch.is_empty()
&& self.metadata_record.is_none()
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
}
}
/// The interpreted part of the Postgres WAL record which requires metadata
/// writes to the underlying storage engine.
#[derive(Serialize, Deserialize)]

View File

@@ -496,11 +496,16 @@ impl SerializedValueBatch {
}
}
/// Checks if the batch is empty
///
/// A batch is empty when it contains no serialized values.
/// Note that it may still contain observed values.
/// Checks if the batch contains any serialized or observed values
pub fn is_empty(&self) -> bool {
!self.has_data() && self.metadata.is_empty()
}
/// Checks if the batch contains data
///
/// Note that if this returns false, it may still contain observed values or
/// a metadata record.
pub fn has_data(&self) -> bool {
let empty = self.raw.is_empty();
if cfg!(debug_assertions) && empty {
@@ -510,7 +515,7 @@ impl SerializedValueBatch {
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
}
empty
!empty
}
/// Returns the number of values serialized in the batch

View File

@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.

View File

@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
use std::env;
use storage_broker::Uri;
use utils::logging::SecretString;
use utils::postgres_client::PostgresClientProtocol;
use once_cell::sync::OnceCell;
use reqwest::Url;
@@ -182,6 +183,8 @@ pub struct PageServerConf {
/// Optionally disable disk syncs (unsafe!)
pub no_sync: bool,
pub wal_receiver_protocol: PostgresClientProtocol,
}
/// Token for authentication to safekeepers
@@ -338,6 +341,7 @@ impl PageServerConf {
virtual_file_io_engine,
tenant_config,
no_sync,
wal_receiver_protocol,
} = config_toml;
let mut conf = PageServerConf {
@@ -377,6 +381,7 @@ impl PageServerConf {
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
wal_receiver_protocol,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -1055,10 +1055,9 @@ impl<'a> DatadirModification<'a> {
}
pub(crate) fn has_dirty_data(&self) -> bool {
!self
.pending_data_batch
self.pending_data_batch
.as_ref()
.map_or(true, |b| b.is_empty())
.map_or(false, |b| b.has_data())
}
/// Set the current lsn
@@ -1234,7 +1233,7 @@ impl<'a> DatadirModification<'a> {
Some(pending_batch) => {
pending_batch.extend(batch);
}
None if !batch.is_empty() => {
None if batch.has_data() => {
self.pending_data_batch = Some(batch);
}
None => {

View File

@@ -2416,6 +2416,7 @@ impl Timeline {
*guard = Some(WalReceiver::start(
Arc::clone(self),
WalReceiverConf {
protocol: self.conf.wal_receiver_protocol,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
@@ -5788,7 +5789,7 @@ impl<'a> TimelineWriter<'a> {
batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if batch.is_empty() {
if !batch.has_data() {
return Ok(());
}

View File

@@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::postgres_client::PostgresClientProtocol;
use self::connection_manager::ConnectionManagerStatus;
@@ -45,6 +46,7 @@ use super::Timeline;
#[derive(Clone)]
pub struct WalReceiverConf {
pub protocol: PostgresClientProtocol,
/// The timeout on the connection to safekeeper for WAL streaming.
pub wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.

View File

@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::postgres_client::{
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -984,15 +986,33 @@ impl ConnectionManagerState {
if info.safekeeper_connstr.is_empty() {
return None; // no connection string, ignore sk
}
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.conf.auth_token {
None => None,
Some(x) => Some(x),
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
PostgresClientProtocol::Vanilla => {
(None, None, None)
},
self.conf.availability_zone.as_deref(),
) {
PostgresClientProtocol::Interpreted => {
let shard_identity = self.timeline.get_shard_identity();
(
Some(shard_identity.number.0),
Some(shard_identity.count.0),
Some(shard_identity.stripe_size.0),
)
}
};
let connection_conf_args = ConnectionConfigArgs {
protocol: self.conf.protocol,
ttid: self.id,
shard_number,
shard_count,
shard_stripe_size,
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
};
match wal_stream_connection_config(connection_conf_args) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
@@ -1096,6 +1116,7 @@ impl ReconnectReason {
mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
use url::Host;
fn dummy_broker_sk_timeline(
@@ -1532,6 +1553,7 @@ mod tests {
timeline,
cancel: CancellationToken::new(),
conf: WalReceiverConf {
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),

View File

@@ -36,7 +36,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{id::NodeId, lsn::Lsn};
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
/// Status of the connection.
@@ -291,6 +291,15 @@ pub(super) async fn handle_walreceiver_connection(
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
}
ReplicationMessage::RawInterpretedWalRecords(raw) => {
connection_status.latest_connection_update = now;
if !raw.data().is_empty() {
connection_status.latest_wal_update = now;
}
connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
@@ -298,7 +307,130 @@ pub(super) async fn handle_walreceiver_connection(
return Ok(());
}
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
let status_update = match replication_message {
ReplicationMessage::RawInterpretedWalRecords(raw) => {
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
// This is the end LSN of the raw WAL from which the records
// were interpreted.
let streaming_lsn = Lsn::from(raw.streaming_lsn());
tracing::debug!(
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
Lsn(raw.next_record_lsn().unwrap_or(0))
);
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
anyhow::anyhow!(
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
)
})?;
// We start the modification at 0 because each interpreted record
// advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0));
for interpreted in records {
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
let next_record_lsn = interpreted.next_record_lsn;
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
uncommitted_records += 1;
// FIXME: this cannot be made pausable_failpoint without fixing the
// failpoint library; in tests, the added amount of debugging will cause us
// to timeout the tests.
fail_point!("walreceiver-after-ingest");
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
if uncommitted_records >= ingest_batch_size
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}
// Records might have been filtered out on the safekeeper side, but we still
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
};
if uncommitted_records > 0 || needs_last_record_lsn_advance {
// Commit any uncommitted records
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
if !caught_up && streaming_lsn >= end_of_wal {
info!("caught up at LSN {streaming_lsn}");
caught_up = true;
}
tracing::debug!(
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
timeline.get_last_record_lsn()
);
Some(streaming_lsn)
}
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
@@ -316,21 +448,6 @@ pub(super) async fn handle_walreceiver_connection(
let mut uncommitted_records = 0;
let mut filtered_records = 0;
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are

View File

@@ -28,6 +28,7 @@ hyper0.workspace = true
futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pageserver_api.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
@@ -57,6 +58,7 @@ sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
utils.workspace = true
wal_decoder.workspace = true
workspace_hack.workspace = true

View File

@@ -2,11 +2,15 @@
//! protocol commands.
use anyhow::Context;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info, info_span, Instrument};
use utils::postgres_client::PostgresClientProtocol;
use utils::shard::{ShardCount, ShardNumber};
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
@@ -35,6 +39,8 @@ pub struct SafekeeperPostgresHandler {
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
pub shard: Option<ShardIdentity>,
pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
@@ -107,11 +113,28 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
) -> Result<(), QueryError> {
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(options) = params.options_raw() {
let mut shard_count: Option<u8> = None;
let mut shard_number: Option<u8> = None;
let mut shard_stripe_size: Option<u32> = None;
for opt in options {
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
// remove these after the PR gets deployed:
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
match opt.split_once('=') {
Some(("protocol", value)) => {
let raw_value = value
.parse::<u8>()
.with_context(|| format!("Failed to parse {value} as protocol"))?;
self.protocol = Some(
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
QueryError::Other(anyhow::anyhow!(
"Unexpected client protocol type: {raw_value}"
))
})?,
);
}
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
self.tenant_id = Some(value.parse().with_context(|| {
format!("Failed to parse {value} as tenant id")
@@ -127,9 +150,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
metrics.set_client_az(client_az)
}
}
Some(("shard_count", value)) => {
shard_count = Some(value.parse::<u8>().with_context(|| {
format!("Failed to parse {value} as shard count")
})?);
}
Some(("shard_number", value)) => {
shard_number = Some(value.parse::<u8>().with_context(|| {
format!("Failed to parse {value} as shard number")
})?);
}
Some(("shard_stripe_size", value)) => {
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
format!("Failed to parse {value} as shard stripe size")
})?);
}
_ => continue,
}
}
match self.protocol() {
PostgresClientProtocol::Vanilla => {
if shard_count.is_some()
|| shard_number.is_some()
|| shard_stripe_size.is_some()
{
return Err(QueryError::Other(anyhow::anyhow!(
"Shard params specified for vanilla protocol"
)));
}
}
PostgresClientProtocol::Interpreted => {
match (shard_count, shard_number, shard_stripe_size) {
(Some(count), Some(number), Some(stripe_size)) => {
let params = ShardParameters {
count: ShardCount(count),
stripe_size: ShardStripeSize(stripe_size),
};
self.shard =
Some(ShardIdentity::from_params(ShardNumber(number), &params));
}
_ => {
return Err(QueryError::Other(anyhow::anyhow!(
"Shard params were not specified"
)));
}
}
}
}
}
if let Some(app_name) = params.get("application_name") {
@@ -150,6 +218,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
tracing::field::debug(self.appname.clone()),
);
if let Some(shard) = self.shard.as_ref() {
tracing::Span::current()
.record("shard", tracing::field::display(shard.shard_slug()));
}
Ok(())
} else {
Err(QueryError::Other(anyhow::anyhow!(
@@ -258,6 +331,8 @@ impl SafekeeperPostgresHandler {
tenant_id: None,
timeline_id: None,
ttid: TenantTimelineId::empty(),
shard: None,
protocol: None,
conn_id,
claims: None,
auth,
@@ -265,6 +340,10 @@ impl SafekeeperPostgresHandler {
}
}
pub fn protocol(&self) -> PostgresClientProtocol {
self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
}
// when accessing management api supply None as an argument
// when using to authorize tenant pass corresponding tenant id
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {

View File

@@ -29,6 +29,7 @@ pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_interpreted_wal;
pub mod send_wal;
pub mod state;
pub mod timeline;
@@ -38,6 +39,7 @@ pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;

View File

@@ -17,6 +17,7 @@ use tokio::{
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::types::PgLsn;
use tracing::*;
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
@@ -325,7 +326,17 @@ async fn recovery_stream(
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// TODO: pass auth token
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
let connection_conf_args = ConnectionConfigArgs {
protocol: PostgresClientProtocol::Vanilla,
ttid: tli.ttid,
shard_number: None,
shard_count: None,
shard_stripe_size: None,
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));

View File

@@ -0,0 +1,121 @@
use std::time::Duration;
use anyhow::Context;
use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::MAX_SEND_SIZE;
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::MissedTickBehavior;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use wal_decoder::models::InterpretedWalRecord;
use crate::send_wal::EndWatchView;
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
/// Shard-aware interpreted record sender.
/// This is used for sending WAL to the pageserver. Said WAL
/// is pre-interpreted and filtered for the shard.
pub(crate) struct InterpretedWalSender<'a, IO> {
pub(crate) pgb: &'a mut PostgresBackend<IO>,
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
pub(crate) end_watch_view: EndWatchView,
pub(crate) shard: ShardIdentity,
pub(crate) pg_version: u32,
pub(crate) appname: Option<String>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
/// Send interpreted WAL to a receiver.
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
let mut wal_position = self.wal_stream_builder.start_pos();
let mut wal_decoder =
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
let mut stream = std::pin::pin!(stream);
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
keepalive_ticker.reset();
loop {
tokio::select! {
// Get some WAL from the stream and then: decode, interpret and send it
wal = stream.next() => {
let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal {
Some(some) => some?,
None => { break; }
};
wal_position = wal_start_lsn;
wal_decoder.feed_bytes(&wal);
let mut records = Vec::new();
let mut max_next_record_lsn = None;
while let Some((next_record_lsn, recdata)) = wal_decoder
.poll_decode()
.with_context(|| "Failed to decode WAL")?
{
assert!(next_record_lsn.is_aligned());
max_next_record_lsn = Some(next_record_lsn);
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
&self.shard,
next_record_lsn,
self.pg_version,
)
.with_context(|| "Failed to interpret WAL")?;
if !interpreted.is_empty() {
records.push(interpreted);
}
}
let mut buf = Vec::new();
records
.ser_into(&mut buf)
.with_context(|| "Failed to serialize interpreted WAL")?;
// Reset the keep alive ticker since we are sending something
// over the wire now.
keepalive_ticker.reset();
self.pgb
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
streaming_lsn: wal_end_lsn.0,
commit_lsn: commit_lsn.0,
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
data: buf.as_slice(),
})).await?;
}
// Send a periodic keep alive when the connection has been idle for a while.
_ = keepalive_ticker.tick() => {
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_watch_view.get().0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.await?;
}
}
}
// The loop above ends when the receiver is caught up and there's no more WAL to send.
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, wal_position,
)))
}
}

View File

@@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::send_interpreted_wal::InterpretedWalSender;
use crate::timeline::WalResidentTimeline;
use crate::wal_reader_stream::WalReaderStreamBuilder;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use futures::future::Either;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
@@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use utils::postgres_client::PostgresClientProtocol;
use std::cmp::{max, min};
use std::net::SocketAddr;
@@ -226,7 +230,7 @@ impl WalSenders {
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
/// client is not pageserver.
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
let shared = self.mutex.lock();
let slot = shared.get_slot(id);
match slot.feedback {
@@ -370,6 +374,16 @@ pub struct WalSenderGuard {
walsenders: Arc<WalSenders>,
}
impl WalSenderGuard {
pub fn id(&self) -> WalSenderId {
self.id
}
pub fn walsenders(&self) -> &Arc<WalSenders> {
&self.walsenders
}
}
impl Drop for WalSenderGuard {
fn drop(&mut self) {
self.walsenders.unregister(self.id);
@@ -440,11 +454,12 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_)),
appname
appname,
self.protocol(),
);
// switch to copy
@@ -456,19 +471,49 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let mut sender = WalSender {
pgb,
// should succeed since we're already holding another guard
tli: tli.wal_residence_guard().await?,
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: vec![0u8; MAX_SEND_SIZE],
let send_fut = match self.protocol() {
PostgresClientProtocol::Vanilla => {
let sender = WalSender {
pgb,
// should succeed since we're already holding another guard
tli: tli.wal_residence_guard().await?,
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: vec![0u8; MAX_SEND_SIZE],
};
Either::Left(sender.run())
}
PostgresClientProtocol::Interpreted => {
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
let end_watch_view = end_watch.view();
let wal_stream_builder = WalReaderStreamBuilder {
tli: tli.wal_residence_guard().await?,
start_pos,
end_pos,
term,
end_watch,
wal_sender_guard: ws_guard.clone(),
};
let sender = InterpretedWalSender {
pgb,
wal_stream_builder,
end_watch_view,
shard: self.shard.unwrap(),
pg_version,
appname,
};
Either::Right(sender.run())
}
};
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
@@ -477,7 +522,7 @@ impl SafekeeperPostgresHandler {
let res = tokio::select! {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = send_fut => r,
r = reply_reader.run() => r,
};
@@ -499,16 +544,22 @@ impl SafekeeperPostgresHandler {
}
}
/// TODO(vlad): maybe lift this instead
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
/// given term (recovery by walproposer or peer safekeeper).
enum EndWatch {
#[derive(Clone)]
pub(crate) enum EndWatch {
Commit(Receiver<Lsn>),
Flush(Receiver<TermLsn>),
}
impl EndWatch {
pub(crate) fn view(&self) -> EndWatchView {
EndWatchView(self.clone())
}
/// Get current end of WAL.
fn get(&self) -> Lsn {
pub(crate) fn get(&self) -> Lsn {
match self {
EndWatch::Commit(r) => *r.borrow(),
EndWatch::Flush(r) => r.borrow().lsn,
@@ -516,15 +567,44 @@ impl EndWatch {
}
/// Wait for the update.
async fn changed(&mut self) -> anyhow::Result<()> {
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
match self {
EndWatch::Commit(r) => r.changed().await?,
EndWatch::Flush(r) => r.changed().await?,
}
Ok(())
}
pub(crate) async fn wait_for_lsn(
&mut self,
lsn: Lsn,
client_term: Option<Term>,
) -> anyhow::Result<Lsn> {
loop {
let end_pos = self.get();
if end_pos > lsn {
return Ok(end_pos);
}
if let EndWatch::Flush(rx) = &self {
let curr_term = rx.borrow().term;
if let Some(client_term) = client_term {
if curr_term != client_term {
bail!("term changed: requested {}, now {}", client_term, curr_term);
}
}
}
self.changed().await?;
}
}
}
pub(crate) struct EndWatchView(EndWatch);
impl EndWatchView {
pub(crate) fn get(&self) -> Lsn {
self.0.get()
}
}
/// A half driving sending WAL.
struct WalSender<'a, IO> {
pgb: &'a mut PostgresBackend<IO>,
@@ -560,7 +640,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we

View File

@@ -0,0 +1,145 @@
use std::sync::Arc;
use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use postgres_backend::CopyStreamHandlerEnd;
use std::time::Duration;
use tokio::time::timeout;
use utils::lsn::Lsn;
use crate::{
safekeeper::Term,
send_wal::{EndWatch, WalSenderGuard},
timeline::WalResidentTimeline,
};
pub(crate) struct WalReaderStreamBuilder {
pub(crate) tli: WalResidentTimeline,
pub(crate) start_pos: Lsn,
pub(crate) end_pos: Lsn,
pub(crate) term: Option<Term>,
pub(crate) end_watch: EndWatch,
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
}
impl WalReaderStreamBuilder {
pub(crate) fn start_pos(&self) -> Lsn {
self.start_pos
}
}
pub(crate) struct WalBytes {
/// Raw PG WAL
pub(crate) wal: Bytes,
/// Start LSN of [`Self::wal`]
pub(crate) wal_start_lsn: Lsn,
/// End LSN of [`Self::wal`]
pub(crate) wal_end_lsn: Lsn,
/// End LSN of WAL available on the safekeeper
pub(crate) commit_lsn: Lsn,
}
impl WalReaderStreamBuilder {
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
/// The stream terminates when the receiver (pageserver) is fully caught up
/// and there's no active computes.
pub(crate) async fn build(
self,
buffer_size: usize,
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
// We can make the raw WAL sender use this stream too and remove the duplication.
let Self {
tli,
mut start_pos,
mut end_pos,
term,
mut end_watch,
wal_sender_guard,
} = self;
let mut wal_reader = tli.get_walreader(start_pos).await?;
let mut buffer = vec![0; buffer_size];
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
Ok(try_stream! {
loop {
let have_something_to_send = end_pos > start_pos;
if !have_something_to_send {
// wait for lsn
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
match res {
Ok(ok) => {
end_pos = ok?;
},
Err(_) => {
if let EndWatch::Commit(_) = end_watch {
if let Some(remote_consistent_lsn) = wal_sender_guard
.walsenders()
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
{
if tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return;
}
}
}
continue;
}
}
}
assert!(
end_pos > start_pos,
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by the buffer size
let mut chunk_end_pos = start_pos + buffer_size as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= end_pos {
chunk_end_pos = end_pos;
} else {
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
let buffer = &mut buffer[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = term {
Some(tli.acquire_term(t).await?)
} else {
None
};
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
send_size = wal_reader.read(buffer).await?
};
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
yield WalBytes {
wal,
wal_start_lsn: start_pos,
wal_end_lsn: start_pos + send_size as u64,
commit_lsn: end_pos
};
start_pos += send_size as u64;
}
})
}
}

View File

@@ -15,16 +15,21 @@ from fixtures.neon_fixtures import (
@pytest.mark.timeout(600)
@pytest.mark.parametrize("shard_count", [1, 8, 32])
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
def test_sharded_ingest(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
shard_count: int,
wal_receiver_protocol: str,
):
"""
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
(shard_count=1) to the sharded case indicates the overhead of sharding.
"""
neon_env_builder.pageserver_config_override = (
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
)
ROW_COUNT = 100_000_000 # about 7 GB of WAL
@@ -50,7 +55,6 @@ def test_sharded_ingest(
# Start the endpoint.
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Ingest data and measure WAL volume and duration.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -68,4 +72,48 @@ def test_sharded_ingest(
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
total_ingested = 0
total_records_received = 0
ingested_by_ps = []
for pageserver in env.pageservers:
ingested = pageserver.http_client().get_metric_value(
"pageserver_wal_ingest_bytes_received_total"
)
records_received = pageserver.http_client().get_metric_value(
"pageserver_wal_ingest_records_received_total"
)
if ingested is None:
ingested = 0
if records_received is None:
records_received = 0
ingested_by_ps.append(
(
pageserver.id,
{
"ingested": ingested,
"records_received": records_received,
},
)
)
total_ingested += int(ingested)
total_records_received += int(records_received)
total_ingested_mb = total_ingested / (1024 * 1024)
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
)
ingested_by_ps.sort(key=lambda x: x[0])
for _, stats in ingested_by_ps:
for k in stats:
if k != "records_received":
stats[k] /= 1024**2
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"

View File

@@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", default-features = false, features = ["with-serde_json-1"] }
prost = { version = "0.13", features = ["prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
@@ -78,7 +78,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", features = ["with-serde_json-1"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }