mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
18 Commits
skyzh/add-
...
vlad/safek
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0d34e1ebc | ||
|
|
8c4deeed1d | ||
|
|
c3701890d6 | ||
|
|
1c8f4938cb | ||
|
|
dc220e3f18 | ||
|
|
27eb5e6a19 | ||
|
|
901b8e4967 | ||
|
|
a19dd5ae73 | ||
|
|
dceab62789 | ||
|
|
e503db2d45 | ||
|
|
3b4ffaf79a | ||
|
|
4f3583a316 | ||
|
|
3cef53c5c0 | ||
|
|
1f043f480a | ||
|
|
057138f065 | ||
|
|
260127ebf3 | ||
|
|
d64c60d9d9 | ||
|
|
d5d2e268c5 |
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -4009,7 +4009,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4022,7 +4022,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4041,7 +4041,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -5161,6 +5161,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
@@ -5191,6 +5192,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -6227,7 +6229,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=vlad/interpreted-wal-record-replication-support#e619cf8c2c572e71cbc97f1c7f4cab8219f07d55"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -6781,6 +6783,7 @@ dependencies = [
|
||||
"serde_assert",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
|
||||
10
Cargo.toml
10
Cargo.toml
@@ -203,10 +203,10 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
@@ -244,7 +244,7 @@ tonic-build = "0.12"
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::{
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
@@ -109,6 +109,7 @@ pub struct ConfigToml {
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub no_sync: Option<bool>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -317,6 +318,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Interpreted;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -399,6 +403,7 @@ impl Default for ConfigToml {
|
||||
virtual_file_io_mode: None,
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
/// Batch of interpreted, shard filtered WAL records,
|
||||
/// ready for the pageserver to ingest
|
||||
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -672,6 +675,18 @@ pub struct WalSndKeepAlive {
|
||||
pub request_reply: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InterpretedWalRecordsBody<'a> {
|
||||
/// End of raw WAL in [`Self::data`]
|
||||
pub streaming_lsn: u64,
|
||||
/// Current end of WAL on the server
|
||||
pub commit_lsn: u64,
|
||||
/// Start LSN of the next record in PG WAL.
|
||||
/// Is 0 if the portion of PG WAL did not contain any records.
|
||||
pub next_record_lsn: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
|
||||
// single text column
|
||||
@@ -996,6 +1011,20 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
BeMessage::InterpretedWalRecords(rec) => {
|
||||
// We use the COPY_DATA_TAG for our custom message
|
||||
// since this tag is interpreted as raw bytes.
|
||||
buf.put_u8(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
|
||||
// dependency
|
||||
buf.put_u64(rec.streaming_lsn);
|
||||
buf.put_u64(rec.commit_lsn);
|
||||
buf.put_u64(rec.next_record_lsn);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Postgres client protocol types
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[repr(u8)]
|
||||
pub enum PostgresClientProtocol {
|
||||
/// Usual Postgres replication protocol
|
||||
Vanilla,
|
||||
/// Custom shard-aware protocol that replicates interpreted records.
|
||||
/// Used to send wal from safekeeper to pageserver.
|
||||
Interpreted,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PostgresClientProtocol {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
|
||||
v if v == (PostgresClientProtocol::Interpreted as u8) => {
|
||||
PostgresClientProtocol::Interpreted
|
||||
}
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard_number: Option<u8>,
|
||||
pub shard_count: Option<u8>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
pub listen_pg_addr_str: &'a str,
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
fn options(&'a self) -> Vec<String> {
|
||||
let mut options = vec![
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!("protocol={}", self.protocol as u8),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
assert!(self.shard_count.is_some());
|
||||
assert!(self.shard_stripe_size.is_some());
|
||||
|
||||
options.push(format!("shard_count={}", self.shard_count.unwrap()));
|
||||
options.push(format!("shard_number={}", self.shard_number.unwrap()));
|
||||
options.push(format!(
|
||||
"shard_stripe_size={}",
|
||||
self.shard_stripe_size.unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
args: ConnectionConfigArgs,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
.extend_options(args.options())
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
|
||||
@@ -65,6 +65,18 @@ pub struct InterpretedWalRecord {
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Checks if the WAL record is empty
|
||||
///
|
||||
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
|
||||
/// pageserver.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.batch.is_empty()
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -496,11 +496,16 @@ impl SerializedValueBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the batch is empty
|
||||
///
|
||||
/// A batch is empty when it contains no serialized values.
|
||||
/// Note that it may still contain observed values.
|
||||
/// Checks if the batch contains any serialized or observed values
|
||||
pub fn is_empty(&self) -> bool {
|
||||
!self.has_data() && self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains data
|
||||
///
|
||||
/// Note that if this returns false, it may still contain observed values or
|
||||
/// a metadata record.
|
||||
pub fn has_data(&self) -> bool {
|
||||
let empty = self.raw.is_empty();
|
||||
|
||||
if cfg!(debug_assertions) && empty {
|
||||
@@ -510,7 +515,7 @@ impl SerializedValueBatch {
|
||||
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
|
||||
}
|
||||
|
||||
empty
|
||||
!empty
|
||||
}
|
||||
|
||||
/// Returns the number of values serialized in the batch
|
||||
|
||||
@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
|
||||
@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::logging::SecretString;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -182,6 +183,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// Optionally disable disk syncs (unsafe!)
|
||||
pub no_sync: bool,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -338,6 +341,7 @@ impl PageServerConf {
|
||||
virtual_file_io_engine,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -377,6 +381,7 @@ impl PageServerConf {
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
wal_receiver_protocol,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -1055,10 +1055,9 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
pub(crate) fn has_dirty_data(&self) -> bool {
|
||||
!self
|
||||
.pending_data_batch
|
||||
self.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(true, |b| b.is_empty())
|
||||
.map_or(false, |b| b.has_data())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
@@ -1234,7 +1233,7 @@ impl<'a> DatadirModification<'a> {
|
||||
Some(pending_batch) => {
|
||||
pending_batch.extend(batch);
|
||||
}
|
||||
None if !batch.is_empty() => {
|
||||
None if batch.has_data() => {
|
||||
self.pending_data_batch = Some(batch);
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -2416,6 +2416,7 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.conf.wal_receiver_protocol,
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
@@ -5788,7 +5789,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if batch.is_empty() {
|
||||
if !batch.has_data() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
@@ -45,6 +46,7 @@ use super::Timeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalReceiverConf {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
/// The timeout on the connection to safekeeper for WAL streaming.
|
||||
pub wal_connect_timeout: Duration,
|
||||
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
|
||||
|
||||
@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::postgres_client::{
|
||||
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
|
||||
};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -984,15 +986,33 @@ impl ConnectionManagerState {
|
||||
if info.safekeeper_connstr.is_empty() {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
match wal_stream_connection_config(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_ref(),
|
||||
match &self.conf.auth_token {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
|
||||
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
self.conf.availability_zone.as_deref(),
|
||||
) {
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
Some(shard_identity.count.0),
|
||||
Some(shard_identity.stripe_size.0),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: self.conf.protocol,
|
||||
ttid: self.id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
|
||||
@@ -1096,6 +1116,7 @@ impl ReconnectReason {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
use url::Host;
|
||||
|
||||
fn dummy_broker_sk_timeline(
|
||||
@@ -1532,6 +1553,7 @@ mod tests {
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -291,6 +291,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
|
||||
}
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
connection_status.latest_connection_update = now;
|
||||
if !raw.data().is_empty() {
|
||||
connection_status.latest_wal_update = now;
|
||||
}
|
||||
|
||||
connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
|
||||
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
|
||||
}
|
||||
&_ => {}
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
|
||||
@@ -298,7 +307,130 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
tracing::debug!(
|
||||
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
|
||||
Lsn(raw.next_record_lsn().unwrap_or(0))
|
||||
);
|
||||
|
||||
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
uncommitted_records += 1;
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Records might have been filtered out on the safekeeper side, but we still
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
info!("caught up at LSN {streaming_lsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
@@ -316,21 +448,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
|
||||
@@ -28,6 +28,7 @@ hyper0.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -57,6 +58,7 @@ sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -2,11 +2,15 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -35,6 +39,8 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
@@ -107,11 +113,28 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(options) = params.options_raw() {
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
// remove these after the PR gets deployed:
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol", value)) => {
|
||||
let raw_value = value
|
||||
.parse::<u8>()
|
||||
.with_context(|| format!("Failed to parse {value} as protocol"))?;
|
||||
|
||||
self.protocol = Some(
|
||||
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
|
||||
QueryError::Other(anyhow::anyhow!(
|
||||
"Unexpected client protocol type: {raw_value}"
|
||||
))
|
||||
})?,
|
||||
);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
format!("Failed to parse {value} as tenant id")
|
||||
@@ -127,9 +150,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
Some(("shard_count", value)) => {
|
||||
shard_count = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard count")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_number", value)) => {
|
||||
shard_number = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard number")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_stripe_size", value)) => {
|
||||
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
if shard_count.is_some()
|
||||
|| shard_number.is_some()
|
||||
|| shard_stripe_size.is_some()
|
||||
{
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params specified for vanilla protocol"
|
||||
)));
|
||||
}
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
let params = ShardParameters {
|
||||
count: ShardCount(count),
|
||||
stripe_size: ShardStripeSize(stripe_size),
|
||||
};
|
||||
self.shard =
|
||||
Some(ShardIdentity::from_params(ShardNumber(number), ¶ms));
|
||||
}
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params were not specified"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
@@ -150,6 +218,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
|
||||
if let Some(shard) = self.shard.as_ref() {
|
||||
tracing::Span::current()
|
||||
.record("shard", tracing::field::display(shard.shard_slug()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -258,6 +331,8 @@ impl SafekeeperPostgresHandler {
|
||||
tenant_id: None,
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
protocol: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
auth,
|
||||
@@ -265,6 +340,10 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn protocol(&self) -> PostgresClientProtocol {
|
||||
self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
|
||||
}
|
||||
|
||||
// when accessing management api supply None as an argument
|
||||
// when using to authorize tenant pass corresponding tenant id
|
||||
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
|
||||
|
||||
@@ -29,6 +29,7 @@ pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_interpreted_wal;
|
||||
pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
@@ -38,6 +39,7 @@ pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use tokio::{
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
@@ -325,7 +326,17 @@ async fn recovery_stream(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: PostgresClientProtocol::Vanilla,
|
||||
ttid: tli.ttid,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
|
||||
121
safekeeper/src/send_interpreted_wal.rs
Normal file
121
safekeeper/src/send_interpreted_wal.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
|
||||
use postgres_ffi::MAX_SEND_SIZE;
|
||||
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
|
||||
use crate::send_wal::EndWatchView;
|
||||
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
|
||||
/// Shard-aware interpreted record sender.
|
||||
/// This is used for sending WAL to the pageserver. Said WAL
|
||||
/// is pre-interpreted and filtered for the shard.
|
||||
pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) pgb: &'a mut PostgresBackend<IO>,
|
||||
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
|
||||
pub(crate) end_watch_view: EndWatchView,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut wal_position = self.wal_stream_builder.start_pos();
|
||||
let mut wal_decoder =
|
||||
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
|
||||
|
||||
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and send it
|
||||
wal = stream.next() => {
|
||||
let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
wal_position = wal_start_lsn;
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
let mut records = Vec::new();
|
||||
let mut max_next_record_lsn = None;
|
||||
while let Some((next_record_lsn, recdata)) = wal_decoder
|
||||
.poll_decode()
|
||||
.with_context(|| "Failed to decode WAL")?
|
||||
{
|
||||
assert!(next_record_lsn.is_aligned());
|
||||
max_next_record_lsn = Some(next_record_lsn);
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&self.shard,
|
||||
next_record_lsn,
|
||||
self.pg_version,
|
||||
)
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
if !interpreted.is_empty() {
|
||||
records.push(interpreted);
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
records
|
||||
.ser_into(&mut buf)
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
keepalive_ticker.reset();
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
|
||||
streaming_lsn: wal_end_lsn.0,
|
||||
commit_lsn: commit_lsn.0,
|
||||
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
|
||||
data: buf.as_slice(),
|
||||
})).await?;
|
||||
}
|
||||
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The loop above ends when the receiver is caught up and there's no more WAL to send.
|
||||
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, wal_position,
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::RECEIVED_PS_FEEDBACKS;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::send_interpreted_wal::InterpretedWalSender;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use futures::future::Either;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
@@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
@@ -226,7 +230,7 @@ impl WalSenders {
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
@@ -370,6 +374,16 @@ pub struct WalSenderGuard {
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl WalSenderGuard {
|
||||
pub fn id(&self) -> WalSenderId {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.walsenders.unregister(self.id);
|
||||
@@ -440,11 +454,12 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname
|
||||
appname,
|
||||
self.protocol(),
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -456,19 +471,49 @@ impl SafekeeperPostgresHandler {
|
||||
// not synchronized with sends, so this avoids deadlocks.
|
||||
let reader = pgb.split().context("START_REPLICATION split")?;
|
||||
|
||||
let mut sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
let send_fut = match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
let sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
|
||||
Either::Left(sender.run())
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let end_watch_view = end_watch.view();
|
||||
let wal_stream_builder = WalReaderStreamBuilder {
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
wal_sender_guard: ws_guard.clone(),
|
||||
};
|
||||
|
||||
let sender = InterpretedWalSender {
|
||||
pgb,
|
||||
wal_stream_builder,
|
||||
end_watch_view,
|
||||
shard: self.shard.unwrap(),
|
||||
pg_version,
|
||||
appname,
|
||||
};
|
||||
|
||||
Either::Right(sender.run())
|
||||
}
|
||||
};
|
||||
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
@@ -477,7 +522,7 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = sender.run() => r,
|
||||
r = send_fut => r,
|
||||
r = reply_reader.run() => r,
|
||||
};
|
||||
|
||||
@@ -499,16 +544,22 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(vlad): maybe lift this instead
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
pub(crate) fn view(&self) -> EndWatchView {
|
||||
EndWatchView(self.clone())
|
||||
}
|
||||
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
@@ -516,15 +567,44 @@ impl EndWatch {
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_lsn(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
client_term: Option<Term>,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
loop {
|
||||
let end_pos = self.get();
|
||||
if end_pos > lsn {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.changed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct EndWatchView(EndWatch);
|
||||
|
||||
impl EndWatchView {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
self.0.get()
|
||||
}
|
||||
}
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -560,7 +640,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
|
||||
145
safekeeper/src/wal_reader_stream.rs
Normal file
145
safekeeper/src/wal_reader_stream.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use postgres_backend::CopyStreamHandlerEnd;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
safekeeper::Term,
|
||||
send_wal::{EndWatch, WalSenderGuard},
|
||||
timeline::WalResidentTimeline,
|
||||
};
|
||||
|
||||
pub(crate) struct WalReaderStreamBuilder {
|
||||
pub(crate) tli: WalResidentTimeline,
|
||||
pub(crate) start_pos: Lsn,
|
||||
pub(crate) end_pos: Lsn,
|
||||
pub(crate) term: Option<Term>,
|
||||
pub(crate) end_watch: EndWatch,
|
||||
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
pub(crate) fn start_pos(&self) -> Lsn {
|
||||
self.start_pos
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WalBytes {
|
||||
/// Raw PG WAL
|
||||
pub(crate) wal: Bytes,
|
||||
/// Start LSN of [`Self::wal`]
|
||||
pub(crate) wal_start_lsn: Lsn,
|
||||
/// End LSN of [`Self::wal`]
|
||||
pub(crate) wal_end_lsn: Lsn,
|
||||
/// End LSN of WAL available on the safekeeper
|
||||
pub(crate) commit_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
||||
/// The stream terminates when the receiver (pageserver) is fully caught up
|
||||
/// and there's no active computes.
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
buffer_size: usize,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
||||
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
||||
// We can make the raw WAL sender use this stream too and remove the duplication.
|
||||
let Self {
|
||||
tli,
|
||||
mut start_pos,
|
||||
mut end_pos,
|
||||
term,
|
||||
mut end_watch,
|
||||
wal_sender_guard,
|
||||
} = self;
|
||||
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
||||
let mut buffer = vec![0; buffer_size];
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
Ok(try_stream! {
|
||||
loop {
|
||||
let have_something_to_send = end_pos > start_pos;
|
||||
|
||||
if !have_something_to_send {
|
||||
// wait for lsn
|
||||
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
||||
match res {
|
||||
Ok(ok) => {
|
||||
end_pos = ok?;
|
||||
},
|
||||
Err(_) => {
|
||||
if let EndWatch::Commit(_) = end_watch {
|
||||
if let Some(remote_consistent_lsn) = wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
||||
{
|
||||
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assert!(
|
||||
end_pos > start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by the buffer size
|
||||
let mut chunk_end_pos = start_pos + buffer_size as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= end_pos {
|
||||
chunk_end_pos = end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
||||
let buffer = &mut buffer[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = term {
|
||||
Some(tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = wal_reader.read(buffer).await?
|
||||
};
|
||||
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
||||
|
||||
yield WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: start_pos,
|
||||
wal_end_lsn: start_pos + send_size as u64,
|
||||
commit_lsn: end_pos
|
||||
};
|
||||
|
||||
start_pos += send_size as u64;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -15,16 +15,21 @@ from fixtures.neon_fixtures import (
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
wal_receiver_protocol: str,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
@@ -50,7 +55,6 @@ def test_sharded_ingest(
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -68,4 +72,48 @@ def test_sharded_ingest(
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
total_ingested = 0
|
||||
total_records_received = 0
|
||||
ingested_by_ps = []
|
||||
for pageserver in env.pageservers:
|
||||
ingested = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_bytes_received_total"
|
||||
)
|
||||
records_received = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_records_received_total"
|
||||
)
|
||||
|
||||
if ingested is None:
|
||||
ingested = 0
|
||||
|
||||
if records_received is None:
|
||||
records_received = 0
|
||||
|
||||
ingested_by_ps.append(
|
||||
(
|
||||
pageserver.id,
|
||||
{
|
||||
"ingested": ingested,
|
||||
"records_received": records_received,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
total_ingested += int(ingested)
|
||||
total_records_received += int(records_received)
|
||||
|
||||
total_ingested_mb = total_ingested / (1024 * 1024)
|
||||
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
ingested_by_ps.sort(key=lambda x: x[0])
|
||||
for _, stats in ingested_by_ps:
|
||||
for k in stats:
|
||||
if k != "records_received":
|
||||
stats[k] /= 1024**2
|
||||
|
||||
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
@@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", default-features = false, features = ["with-serde_json-1"] }
|
||||
prost = { version = "0.13", features = ["prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -78,7 +78,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "vlad/interpreted-wal-record-replication-support", features = ["with-serde_json-1"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
|
||||
Reference in New Issue
Block a user