Compare commits

...

5 Commits

Author SHA1 Message Date
Vlad Lazar
b3ef315041 more wip 2024-11-06 19:41:22 +01:00
Vlad Lazar
f0044b8651 wip 2024-11-06 16:13:14 +01:00
Vlad Lazar
b7ff993df6 wal_decoder: make InterpretedWalRecord serde 2024-11-06 16:13:14 +01:00
Vlad Lazar
5d096f127e safekeeper: parse new connection configs 2024-11-06 16:13:14 +01:00
Vlad Lazar
70cdd56294 pageserver: include shard id when subscribing to safekeeper 2024-11-06 16:13:14 +01:00
18 changed files with 512 additions and 68 deletions

98
Cargo.lock generated
View File

@@ -1245,7 +1245,7 @@ dependencies = [
"tar",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-util",
"tracing",
@@ -1351,7 +1351,7 @@ dependencies = [
"storage_broker",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-util",
"toml",
"toml_edit",
@@ -3620,8 +3620,8 @@ dependencies = [
"pageserver_compaction",
"pin-project-lite",
"postgres",
"postgres-protocol",
"postgres-types",
"postgres-protocol 0.6.4",
"postgres-types 0.2.4",
"postgres_backend",
"postgres_connection",
"postgres_ffi",
@@ -3649,7 +3649,7 @@ dependencies = [
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -3707,7 +3707,7 @@ dependencies = [
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-util",
"utils",
@@ -4006,14 +4006,31 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
dependencies = [
"base64 0.20.0",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"lazy_static",
"md-5",
"memchr",
"rand 0.8.5",
"sha2",
"stringprep",
"tokio",
]
[[package]]
@@ -4035,6 +4052,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol 0.6.4",
"serde",
"serde_json",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
@@ -4042,7 +4070,7 @@ source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"postgres-protocol 0.6.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
"serde",
"serde_json",
]
@@ -4060,7 +4088,7 @@ dependencies = [
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-util",
@@ -4075,7 +4103,7 @@ dependencies = [
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres",
"tokio-postgres 0.7.7",
"url",
]
@@ -4127,7 +4155,7 @@ dependencies = [
"byteorder",
"bytes",
"itertools 0.10.5",
"postgres-protocol",
"postgres-protocol 0.6.4",
"rand 0.8.5",
"serde",
"thiserror",
@@ -4313,7 +4341,7 @@ dependencies = [
"parquet_derive",
"pbkdf2",
"pin-project-lite",
"postgres-protocol",
"postgres-protocol 0.6.4",
"postgres_backend",
"pq_proto",
"prometheus",
@@ -4348,7 +4376,7 @@ dependencies = [
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-tungstenite",
@@ -5155,9 +5183,10 @@ dependencies = [
"hyper 0.14.30",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres-protocol 0.6.4",
"postgres_backend",
"postgres_ffi",
"pq_proto",
@@ -5177,7 +5206,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -5185,6 +5214,7 @@ dependencies = [
"tracing-subscriber",
"url",
"utils",
"wal_decoder",
"walproposer",
"workspace_hack",
]
@@ -5821,7 +5851,7 @@ dependencies = [
"serde_json",
"storage_controller_client",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
"tokio-stream",
"tokio-util",
@@ -6218,6 +6248,28 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol 0.6.4",
"postgres-types 0.2.4",
"socket2",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
@@ -6234,8 +6286,8 @@ dependencies = [
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"postgres-protocol 0.6.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
"postgres-types 0.2.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
"socket2",
"tokio",
"tokio-util",
@@ -6250,7 +6302,7 @@ dependencies = [
"ring",
"rustls 0.23.16",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-rustls 0.26.0",
"x509-certificate",
]
@@ -6833,7 +6885,7 @@ dependencies = [
"serde_json",
"sysinfo",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-util",
"tracing",
"tracing-subscriber",
@@ -7340,7 +7392,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"postgres-types",
"postgres-types 0.2.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
"prettyplease",
"proc-macro2",
"prost",
@@ -7365,7 +7417,7 @@ dependencies = [
"time",
"time-macros",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-util",

View File

@@ -214,10 +214,14 @@ log = "0.4"
#
# When those proxy changes are re-applied (see PR #8747), we can switch using
# the tip of the 'neon' branch again.
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
# postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
# postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
# postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
# tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres" }
postgres-protocol = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres-protocol" }
postgres-types = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres-types" }
tokio-postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/tokio-postgres" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -255,7 +259,8 @@ tonic-build = "0.12"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/tokio-postgres" }
# tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
################# Binary contents sections

View File

@@ -24,7 +24,7 @@ pub struct Key {
/// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as
/// a struct of fields.
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)]
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, Debug)]
pub struct CompactKey(i128);
/// The storage key size.

View File

@@ -24,7 +24,7 @@ use postgres_ffi::Oid;
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
// Then we could replace the custom Ord and PartialOrd implementations below with
// deriving them. This will require changes in walredoproc.c.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub struct RelTag {
pub forknum: u8,
pub spcnode: Oid,

View File

@@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
pub mid: MultiXactId,
/* new MultiXact's ID */
@@ -46,7 +46,7 @@ impl XlMultiXactCreate {
}
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlMultiXactTruncate {
pub oldest_multi_db: Oid,
/* to-be-truncated range of multixact offsets */
@@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
}
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlRelmapUpdate {
pub dbid: Oid, /* database ID, or 0 for shared map */
pub tsid: Oid, /* database's tablespace, or pg_global */
@@ -90,7 +90,7 @@ impl XlRelmapUpdate {
}
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlReploriginDrop {
pub node_id: RepOriginId,
}
@@ -104,7 +104,7 @@ impl XlReploriginDrop {
}
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlReploriginSet {
pub remote_lsn: Lsn,
pub node_id: RepOriginId,
@@ -120,7 +120,7 @@ impl XlReploriginSet {
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct RelFileNode {
pub spcnode: Oid, /* tablespace */
pub dbnode: Oid, /* database */
@@ -911,7 +911,7 @@ impl XlSmgrCreate {
}
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
@@ -984,7 +984,7 @@ impl XlDropDatabase {
/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
/// struct for commits and aborts.
///
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct XlXactParsedRecord {
pub xid: TransactionId,
pub info: u8,

View File

@@ -13,3 +13,4 @@ rand.workspace = true
tokio = { workspace = true, features = ["io-util"] }
thiserror.workspace = true
serde.workspace = true
# wal_decoder.workspace = true

View File

@@ -562,6 +562,7 @@ pub enum BeMessage<'a> {
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
InterpretedWalRecord(InterpretedWalRecordBody<'a>),
}
/// Common shorthands.
@@ -665,6 +666,12 @@ pub struct XLogDataBody<'a> {
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct InterpretedWalRecordBody<'a> {
pub wal_end: u64,
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub wal_end: u64, // current end of WAL on the server
@@ -996,6 +1003,15 @@ impl BeMessage<'_> {
Ok(())
})?
}
BeMessage::InterpretedWalRecord(rec) => {
buf.put_u8(b'd'); // arbitrary?
write_body(buf, |buf| {
buf.put_u8(b'0');
buf.put_u64(rec.wal_end);
buf.put_slice(rec.data);
});
}
}
Ok(())
}

View File

@@ -7,29 +7,65 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Protocol used for safekeeper recovery. This sends raw Postgres WAL.
pub const POSTGRES_PROTO_VERSION: u8 = 0;
/// Protocol used for safekeeper to pageserver communication.
/// This sends interpreted WAL records for the pageserver to ingest
/// and is shard-aware.
pub const PAGESERVER_SAFEKEEPER_PROTO_VERSION: u8 = 1;
pub struct ConnectionConfigArgs<'a> {
pub protocol_version: u8,
pub ttid: TenantTimelineId,
pub shard_number: Option<u8>,
pub shard_count: Option<u8>,
pub shard_stripe_size: Option<u32>,
pub listen_pg_addr_str: &'a str,
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
}
impl<'a> ConnectionConfigArgs<'a> {
fn options(&'a self) -> Vec<String> {
let mut options = vec![
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!("protocol_version={}", self.protocol_version),
];
if self.shard_number.is_some() {
assert!(self.shard_count.is_some());
assert!(self.shard_stripe_size.is_some());
options.push(format!("shard_count={}", self.shard_count.unwrap()));
options.push(format!("shard_number={}", self.shard_number.unwrap()));
options.push(format!(
"shard_stripe_size={}",
self.shard_stripe_size.unwrap()
));
}
options
}
}
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
args: ConnectionConfigArgs,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
.extend_options(args.options())
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}

View File

@@ -32,16 +32,19 @@ use postgres_ffi::walrecord::{
XlSmgrTruncate, XlXactParsedRecord,
};
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
use crate::serialized_batch::SerializedValueBatch;
#[derive(Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecord {
/// Optional metadata record - may cause writes to metadata keys
/// in the storage engine
@@ -62,6 +65,7 @@ pub struct InterpretedWalRecord {
/// The interpreted part of the Postgres WAL record which requires metadata
/// writes to the underlying storage engine.
#[derive(Serialize, Deserialize)]
pub enum MetadataRecord {
Heapam(HeapamRecord),
Neonrmgr(NeonrmgrRecord),
@@ -77,10 +81,12 @@ pub enum MetadataRecord {
Replorigin(ReploriginRecord),
}
#[derive(Serialize, Deserialize)]
pub enum HeapamRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
pub struct ClearVmBits {
pub new_heap_blkno: Option<u32>,
pub old_heap_blkno: Option<u32>,
@@ -88,24 +94,29 @@ pub struct ClearVmBits {
pub flags: u8,
}
#[derive(Serialize, Deserialize)]
pub enum NeonrmgrRecord {
ClearVmBits(ClearVmBits),
}
#[derive(Serialize, Deserialize)]
pub enum SmgrRecord {
Create(SmgrCreate),
Truncate(XlSmgrTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct SmgrCreate {
pub rel: RelTag,
}
#[derive(Serialize, Deserialize)]
pub enum DbaseRecord {
Create(DbaseCreate),
Drop(DbaseDrop),
}
#[derive(Serialize, Deserialize)]
pub struct DbaseCreate {
pub db_id: Oid,
pub tablespace_id: Oid,
@@ -113,27 +124,32 @@ pub struct DbaseCreate {
pub src_tablespace_id: Oid,
}
#[derive(Serialize, Deserialize)]
pub struct DbaseDrop {
pub db_id: Oid,
pub tablespace_ids: Vec<Oid>,
}
#[derive(Serialize, Deserialize)]
pub enum ClogRecord {
ZeroPage(ClogZeroPage),
Truncate(ClogTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct ClogZeroPage {
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
pub struct ClogTruncate {
pub pageno: u32,
pub oldest_xid: TransactionId,
pub oldest_xid_db: Oid,
}
#[derive(Serialize, Deserialize)]
pub enum XactRecord {
Commit(XactCommon),
Abort(XactCommon),
@@ -142,6 +158,7 @@ pub enum XactRecord {
Prepare(XactPrepare),
}
#[derive(Serialize, Deserialize)]
pub struct XactCommon {
pub parsed: XlXactParsedRecord,
pub origin_id: u16,
@@ -150,61 +167,73 @@ pub struct XactCommon {
pub lsn: Lsn,
}
#[derive(Serialize, Deserialize)]
pub struct XactPrepare {
pub xl_xid: TransactionId,
pub data: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum MultiXactRecord {
ZeroPage(MultiXactZeroPage),
Create(XlMultiXactCreate),
Truncate(XlMultiXactTruncate),
}
#[derive(Serialize, Deserialize)]
pub struct MultiXactZeroPage {
pub slru_kind: SlruKind,
pub segno: u32,
pub rpageno: u32,
}
#[derive(Serialize, Deserialize)]
pub enum RelmapRecord {
Update(RelmapUpdate),
}
#[derive(Serialize, Deserialize)]
pub struct RelmapUpdate {
pub update: XlRelmapUpdate,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum XlogRecord {
Raw(RawXlogRecord),
}
#[derive(Serialize, Deserialize)]
pub struct RawXlogRecord {
pub info: u8,
pub lsn: Lsn,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum LogicalMessageRecord {
Put(PutLogicalMessage),
#[cfg(feature = "testing")]
Failpoint,
}
#[derive(Serialize, Deserialize)]
pub struct PutLogicalMessage {
pub path: String,
pub buf: Bytes,
}
#[derive(Serialize, Deserialize)]
pub enum StandbyRecord {
RunningXacts(StandbyRunningXacts),
}
#[derive(Serialize, Deserialize)]
pub struct StandbyRunningXacts {
pub oldest_running_xid: TransactionId,
}
#[derive(Serialize, Deserialize)]
pub enum ReploriginRecord {
Set(XlReploriginSet),
Drop(XlReploriginDrop),

View File

@@ -16,6 +16,7 @@ use pageserver_api::shard::ShardIdentity;
use pageserver_api::{key::CompactKey, value::Value};
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
use serde::{Deserialize, Serialize};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -29,6 +30,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
/// relation sizes. In the case of "observed" values, we only need to know
/// the key and LSN, so two types of metadata are supported to save on network
/// bandwidth.
#[derive(Serialize, Deserialize, Debug)]
pub enum ValueMeta {
Serialized(SerializedValueMeta),
Observed(ObservedValueMeta),
@@ -75,6 +77,7 @@ impl PartialEq for OrderedValueMeta {
impl Eq for OrderedValueMeta {}
/// Metadata for a [`Value`] serialized into the batch.
#[derive(Serialize, Deserialize, Debug)]
pub struct SerializedValueMeta {
pub key: CompactKey,
pub lsn: Lsn,
@@ -86,12 +89,14 @@ pub struct SerializedValueMeta {
}
/// Metadata for a [`Value`] observed by the batch
#[derive(Serialize, Deserialize, Debug)]
pub struct ObservedValueMeta {
pub key: CompactKey,
pub lsn: Lsn,
}
/// Batch of serialized [`Value`]s.
#[derive(Serialize, Deserialize)]
pub struct SerializedValueBatch {
/// [`Value`]s serialized in EphemeralFile's native format,
/// ready for disk write by the pageserver

View File

@@ -1164,12 +1164,19 @@ impl<'a> DatadirModification<'a> {
.get_rel_exists(rel, Version::Modified(self), ctx)
.await?
{
tracing::debug!("Creating relation {rel:?} at lsn {}", self.get_lsn());
// create it with 0 size initially, the logic below will extend it
self.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
Ok(0)
} else {
tracing::debug!(
"Skipping relation {rel:?} creation at lsn {}",
self.get_lsn()
);
self.tline
.get_rel_size(rel, Version::Modified(self), ctx)
.await
@@ -1210,6 +1217,8 @@ impl<'a> DatadirModification<'a> {
shard: &ShardIdentity,
ctx: &RequestContext,
) -> anyhow::Result<()> {
tracing::debug!("Ingesting batch with metadata: {:?}", batch.metadata);
let mut gaps_at_lsns = Vec::default();
for meta in batch.metadata.iter() {

View File

@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::postgres_client::{
wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION,
};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -984,15 +986,29 @@ impl ConnectionManagerState {
if info.safekeeper_connstr.is_empty() {
return None; // no connection string, ignore sk
}
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.conf.auth_token {
None => None,
Some(x) => Some(x),
},
self.conf.availability_zone.as_deref(),
) {
let shard_identity = self.timeline.get_shard_identity();
let connection_conf_args = ConnectionConfigArgs {
protocol_version: PAGESERVER_SAFEKEEPER_PROTO_VERSION,
ttid: self.id,
shard_number: Some(shard_identity.number.0),
shard_count: Some(shard_identity.count.0),
shard_stripe_size: Some(shard_identity.stripe_size.0),
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
};
// let connection_conf_args = ConnectionConfigArgs {
// protocol_version: POSTGRES_PROTO_VERSION,
// ttid: self.id,
// shard_number: None,
// shard_count: None,
// shard_stripe_size: None,
// listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
// auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
// availability_zone: self.conf.availability_zone.as_deref()
// };
match wal_stream_connection_config(connection_conf_args) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);

View File

@@ -36,7 +36,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{id::NodeId, lsn::Lsn};
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
/// Status of the connection.
@@ -278,6 +278,7 @@ pub(super) async fn handle_walreceiver_connection(
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
match &replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// TODO(vlad) Is this crap needed?
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
@@ -299,6 +300,24 @@ pub(super) async fn handle_walreceiver_connection(
}
let status_update = match replication_message {
ReplicationMessage::RawInterpretedWalRecord(raw) => {
connection_status.latest_connection_update = now;
connection_status.latest_wal_update = now;
connection_status.commit_lsn = Some(Lsn::from(raw.wal_end()));
let interpreted = InterpretedWalRecord::des(raw.data()).unwrap();
let end_lsn = interpreted.end_lsn;
let mut modification = timeline.begin_modification(end_lsn);
walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| format!("could not ingest record at {}", end_lsn))?;
modification.commit(&ctx).await?;
Some(end_lsn)
}
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.

View File

@@ -316,6 +316,7 @@ impl ConnCfg {
let client_config = client_config.with_no_client_auth();
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
// TODO(vlad): que?
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
host,

View File

@@ -28,6 +28,7 @@ hyper0.workspace = true
futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pageserver_api.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
@@ -57,6 +58,7 @@ sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
utils.workspace = true
wal_decoder.workspace = true
workspace_hack.workspace = true

View File

@@ -2,11 +2,14 @@
//! protocol commands.
use anyhow::Context;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info, info_span, Instrument};
use utils::postgres_client::PAGESERVER_SAFEKEEPER_PROTO_VERSION;
use utils::shard::{ShardCount, ShardNumber};
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
@@ -35,6 +38,8 @@ pub struct SafekeeperPostgresHandler {
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
pub shard: Option<ShardIdentity>,
pub protocol_version: Option<u8>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
@@ -107,11 +112,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
) -> Result<(), QueryError> {
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(options) = params.options_raw() {
let mut shard_count: Option<u8> = None;
let mut shard_number: Option<u8> = None;
let mut shard_stripe_size: Option<u32> = None;
for opt in options {
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
// remove these after the PR gets deployed:
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
match opt.split_once('=') {
Some(("protocol_version", value)) => {
self.protocol_version =
Some(value.parse::<u8>().with_context(|| {
format!("Failed to parse {value} as protocol_version")
})?);
}
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
self.tenant_id = Some(value.parse().with_context(|| {
format!("Failed to parse {value} as tenant id")
@@ -127,9 +142,44 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
metrics.set_client_az(client_az)
}
}
Some(("shard_count", value)) => {
shard_count = Some(value.parse::<u8>().with_context(|| {
format!("Failed to parse {value} as shard count")
})?);
}
Some(("shard_number", value)) => {
shard_number = Some(value.parse::<u8>().with_context(|| {
format!("Failed to parse {value} as shard number")
})?);
}
Some(("shard_stripe_size", value)) => {
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
format!("Failed to parse {value} as shard stripe size")
})?);
}
_ => continue,
}
}
if self.protocol_version == Some(PAGESERVER_SAFEKEEPER_PROTO_VERSION) {
match (shard_count, shard_number, shard_stripe_size) {
(Some(count), Some(number), Some(stripe_size)) => {
self.shard = Some(
ShardIdentity::new(
ShardNumber(number),
ShardCount(count),
ShardStripeSize(stripe_size),
)
.with_context(|| "Failed to create shard identity")?,
);
}
_ => {
return Err(QueryError::Other(anyhow::anyhow!(
"Shard params were not specified"
)));
}
}
}
}
if let Some(app_name) = params.get("application_name") {
@@ -150,6 +200,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
tracing::field::debug(self.appname.clone()),
);
if let Some(shard) = self.shard.as_ref() {
tracing::Span::current()
.record("shard", tracing::field::display(shard.shard_slug()));
}
Ok(())
} else {
Err(QueryError::Other(anyhow::anyhow!(
@@ -258,6 +313,8 @@ impl SafekeeperPostgresHandler {
tenant_id: None,
timeline_id: None,
ttid: TenantTimelineId::empty(),
shard: None,
protocol_version: None,
conn_id,
claims: None,
auth,

View File

@@ -17,6 +17,7 @@ use tokio::{
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::types::PgLsn;
use tracing::*;
use utils::postgres_client::{ConnectionConfigArgs, POSTGRES_PROTO_VERSION};
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
@@ -325,7 +326,17 @@ async fn recovery_stream(
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// TODO: pass auth token
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
let connection_conf_args = ConnectionConfigArgs {
protocol_version: POSTGRES_PROTO_VERSION,
ttid: tli.ttid,
shard_number: None,
shard_count: None,
shard_stripe_size: None,
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));

View File

@@ -11,17 +11,21 @@ use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use pageserver_api::shard::ShardIdentity;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use pq_proto::{BeMessage, InterpretedWalRecordBody, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use utils::postgres_client::{PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION};
use wal_decoder::models::InterpretedWalRecord;
use std::cmp::{max, min};
use std::net::SocketAddr;
@@ -377,6 +381,10 @@ impl Drop for WalSenderGuard {
}
impl SafekeeperPostgresHandler {
pub fn protocol_version(&self) -> u8 {
self.protocol_version.unwrap_or(POSTGRES_PROTO_VERSION)
}
/// Wrapper around handle_start_replication_guts handling result. Error is
/// handled here while we're still in walsender ttid span; with API
/// extension, this can probably be moved into postgres_backend.
@@ -412,6 +420,7 @@ impl SafekeeperPostgresHandler {
let appname = self.appname.clone();
// Use a guard object to remove our entry from the timeline when we are done.
// TODO(vlad): maybe thread shard stuff into here
let ws_guard = Arc::new(tli.get_walsenders().register(
self.ttid,
*pgb.get_peer_addr(),
@@ -475,9 +484,10 @@ impl SafekeeperPostgresHandler {
tli,
};
let protocol_version = self.protocol_version();
let res = tokio::select! {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = sender.run(protocol_version, self.shard.as_ref()) => r,
r = reply_reader.run() => r,
};
@@ -560,7 +570,35 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
/// TODO(vlad): add a run variant which accumulates a full wall record
/// and interprets it.
async fn run(
&mut self,
protocol_version: u8,
shard: Option<&ShardIdentity>,
) -> Result<(), CopyStreamHandlerEnd> {
match protocol_version {
POSTGRES_PROTO_VERSION => self.run_wal_sender().await,
PAGESERVER_SAFEKEEPER_PROTO_VERSION => {
self.run_interpreted_record_sender(shard.unwrap()).await
}
// TODO: make the proto version an enum
_ => unreachable!(),
}
}
async fn run_interpreted_record_sender(
&mut self,
shard: &ShardIdentity,
) -> Result<(), CopyStreamHandlerEnd> {
let mut last_logged_at = std::time::Instant::now();
let mut interpreted_records = 0;
let mut interpreted_bytes = 0;
let mut useful_bytes = 0;
let pg_version = self.tli.tli.get_state().await.1.server.pg_version / 10000;
let mut wal_decoder = WalStreamDecoder::new(self.start_pos, pg_version);
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
@@ -601,6 +639,141 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
};
let send_buf = &send_buf[..send_size];
wal_decoder.feed_bytes(send_buf);
// How fast or slow is this. Write a little benchmark
// to see how quiclky we can decode 1GiB of WAL.
// If this is slow, then we have a problem since it bottlenecks
// the whole afair. SK can send about 60-70MiB of raw WAL and
// about 13-17MiB of useful interpreted WAL per second (these
// number are for one shard).
while let Some((record_end_lsn, recdata)) = wal_decoder
.poll_decode()
.with_context(|| "Failed to decode WAL")?
{
assert!(record_end_lsn.is_aligned());
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
shard,
record_end_lsn,
pg_version,
)
.with_context(|| "Failed to interpret WAL")?;
let useful_size = interpreted.batch.buffer_size();
let mut buf = Vec::new();
interpreted
.ser_into(&mut buf)
.with_context(|| "Failed to serialize interpreted WAL")?;
let size = buf.len();
self.pgb
.write_message(&BeMessage::InterpretedWalRecord(InterpretedWalRecordBody {
wal_end: self.end_pos.0,
data: buf.as_slice(),
}))
.await?;
interpreted_records += 1;
interpreted_bytes += size;
useful_bytes += useful_size;
}
// and send it
// self.pgb
// .write_message(&BeMessage::XLogData(XLogDataBody {
// wal_start: self.start_pos.0,
// wal_end: self.end_pos.0,
// timestamp: get_current_timestamp(),
// data: send_buf,
// }))
// .await?;
// if let Some(appname) = &self.appname {
// if appname == "replica" {
// failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
// }
// }
// trace!(
// "sent {} bytes of WAL {}-{}",
// send_size,
// self.start_pos,
// self.start_pos + send_size as u64
// );
self.start_pos += send_size as u64;
let elapsed = last_logged_at.elapsed();
if elapsed >= Duration::from_secs(5) {
let records_rate = interpreted_records / elapsed.as_millis() * 1000;
let bytes_rate = interpreted_bytes / elapsed.as_millis() as usize * 1000;
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
tracing::info!(
"Shard {} sender rate: rps={} bps={} ubps={}",
shard.number.0,
records_rate,
bytes_rate,
useful_bytes_rate
);
last_logged_at = std::time::Instant::now();
interpreted_records = 0;
interpreted_bytes = 0;
useful_bytes = 0;
}
}
}
async fn run_wal_sender(&mut self) -> Result<(), CopyStreamHandlerEnd> {
let mut useful_bytes = 0;
let mut last_logged_at = std::time::Instant::now();
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
// communicate it to the receiver.
self.wait_wal().await?;
assert!(
self.end_pos > self.start_pos,
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= self.end_pos {
chunk_end_pos = self.end_pos;
} else {
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = self.term {
Some(self.tli.acquire_term(t).await?)
} else {
None
};
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
useful_bytes += send_buf.len();
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
@@ -623,6 +796,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
self.start_pos + send_size as u64
);
self.start_pos += send_size as u64;
let elapsed = last_logged_at.elapsed();
if elapsed >= Duration::from_secs(5) {
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
tracing::info!(
"Sender rate: ubps={}",
useful_bytes_rate
);
last_logged_at = std::time::Instant::now();
useful_bytes = 0;
}
}
}