mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 06:00:38 +00:00
Compare commits
5 Commits
bodobolero
...
vlad/safek
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3ef315041 | ||
|
|
f0044b8651 | ||
|
|
b7ff993df6 | ||
|
|
5d096f127e | ||
|
|
70cdd56294 |
98
Cargo.lock
generated
98
Cargo.lock
generated
@@ -1245,7 +1245,7 @@ dependencies = [
|
||||
"tar",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
@@ -1351,7 +1351,7 @@ dependencies = [
|
||||
"storage_broker",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"toml_edit",
|
||||
@@ -3620,8 +3620,8 @@ dependencies = [
|
||||
"pageserver_compaction",
|
||||
"pin-project-lite",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres-types 0.2.4",
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
@@ -3649,7 +3649,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
@@ -3707,7 +3707,7 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"utils",
|
||||
@@ -4006,14 +4006,31 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"hmac",
|
||||
"lazy_static",
|
||||
"md-5",
|
||||
"memchr",
|
||||
"rand 0.8.5",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4035,6 +4052,17 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol 0.6.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
@@ -4042,7 +4070,7 @@ source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
@@ -4060,7 +4088,7 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
@@ -4075,7 +4103,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"once_cell",
|
||||
"postgres",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -4127,7 +4155,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"itertools 0.10.5",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -4313,7 +4341,7 @@ dependencies = [
|
||||
"parquet_derive",
|
||||
"pbkdf2",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"prometheus",
|
||||
@@ -4348,7 +4376,7 @@ dependencies = [
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-tungstenite",
|
||||
@@ -5155,9 +5183,10 @@ dependencies = [
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pq_proto",
|
||||
@@ -5177,7 +5206,7 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
@@ -5185,6 +5214,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -5821,7 +5851,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"storage_controller_client",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -6218,6 +6248,28 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres-types 0.2.4",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
@@ -6234,8 +6286,8 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"postgres-protocol 0.6.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
|
||||
"postgres-types 0.2.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -6250,7 +6302,7 @@ dependencies = [
|
||||
"ring",
|
||||
"rustls 0.23.16",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-rustls 0.26.0",
|
||||
"x509-certificate",
|
||||
]
|
||||
@@ -6833,7 +6885,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
@@ -7340,7 +7392,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"parquet",
|
||||
"postgres-types",
|
||||
"postgres-types 0.2.4 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost",
|
||||
@@ -7365,7 +7417,7 @@ dependencies = [
|
||||
"time",
|
||||
"time-macros",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7 (git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2)",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -214,10 +214,14 @@ log = "0.4"
|
||||
#
|
||||
# When those proxy changes are re-applied (see PR #8747), we can switch using
|
||||
# the tip of the 'neon' branch again.
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
# postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
# postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
# postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
# tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres" }
|
||||
postgres-protocol = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres-protocol" }
|
||||
postgres-types = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/postgres-types" }
|
||||
tokio-postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/tokio-postgres" }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
@@ -255,7 +259,8 @@ tonic-build = "0.12"
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
tokio-postgres = { path = "../../.cargo/git/checkouts/rust-postgres-e2c00088c8e2b112/20031d7/tokio-postgres" }
|
||||
# tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ pub struct Key {
|
||||
|
||||
/// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as
|
||||
/// a struct of fields.
|
||||
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)]
|
||||
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, Debug)]
|
||||
pub struct CompactKey(i128);
|
||||
|
||||
/// The storage key size.
|
||||
|
||||
@@ -24,7 +24,7 @@ use postgres_ffi::Oid;
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlMultiXactCreate {
|
||||
pub mid: MultiXactId,
|
||||
/* new MultiXact's ID */
|
||||
@@ -46,7 +46,7 @@ impl XlMultiXactCreate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlMultiXactTruncate {
|
||||
pub oldest_multi_db: Oid,
|
||||
/* to-be-truncated range of multixact offsets */
|
||||
@@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlRelmapUpdate {
|
||||
pub dbid: Oid, /* database ID, or 0 for shared map */
|
||||
pub tsid: Oid, /* database's tablespace, or pg_global */
|
||||
@@ -90,7 +90,7 @@ impl XlRelmapUpdate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlReploriginDrop {
|
||||
pub node_id: RepOriginId,
|
||||
}
|
||||
@@ -104,7 +104,7 @@ impl XlReploriginDrop {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlReploriginSet {
|
||||
pub remote_lsn: Lsn,
|
||||
pub node_id: RepOriginId,
|
||||
@@ -120,7 +120,7 @@ impl XlReploriginSet {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelFileNode {
|
||||
pub spcnode: Oid, /* tablespace */
|
||||
pub dbnode: Oid, /* database */
|
||||
@@ -911,7 +911,7 @@ impl XlSmgrCreate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlSmgrTruncate {
|
||||
pub blkno: BlockNumber,
|
||||
pub rnode: RelFileNode,
|
||||
@@ -984,7 +984,7 @@ impl XlDropDatabase {
|
||||
/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
|
||||
/// struct for commits and aborts.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct XlXactParsedRecord {
|
||||
pub xid: TransactionId,
|
||||
pub info: u8,
|
||||
|
||||
@@ -13,3 +13,4 @@ rand.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util"] }
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
# wal_decoder.workspace = true
|
||||
|
||||
@@ -562,6 +562,7 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
InterpretedWalRecord(InterpretedWalRecordBody<'a>),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -665,6 +666,12 @@ pub struct XLogDataBody<'a> {
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InterpretedWalRecordBody<'a> {
|
||||
pub wal_end: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WalSndKeepAlive {
|
||||
pub wal_end: u64, // current end of WAL on the server
|
||||
@@ -996,6 +1003,15 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
BeMessage::InterpretedWalRecord(rec) => {
|
||||
buf.put_u8(b'd'); // arbitrary?
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'0');
|
||||
buf.put_u64(rec.wal_end);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,29 +7,65 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Protocol used for safekeeper recovery. This sends raw Postgres WAL.
|
||||
pub const POSTGRES_PROTO_VERSION: u8 = 0;
|
||||
/// Protocol used for safekeeper to pageserver communication.
|
||||
/// This sends interpreted WAL records for the pageserver to ingest
|
||||
/// and is shard-aware.
|
||||
pub const PAGESERVER_SAFEKEEPER_PROTO_VERSION: u8 = 1;
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
pub protocol_version: u8,
|
||||
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard_number: Option<u8>,
|
||||
pub shard_count: Option<u8>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
pub listen_pg_addr_str: &'a str,
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
fn options(&'a self) -> Vec<String> {
|
||||
let mut options = vec![
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!("protocol_version={}", self.protocol_version),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
assert!(self.shard_count.is_some());
|
||||
assert!(self.shard_stripe_size.is_some());
|
||||
|
||||
options.push(format!("shard_count={}", self.shard_count.unwrap()));
|
||||
options.push(format!("shard_number={}", self.shard_number.unwrap()));
|
||||
options.push(format!(
|
||||
"shard_stripe_size={}",
|
||||
self.shard_stripe_size.unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
args: ConnectionConfigArgs,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
.extend_options(args.options())
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
|
||||
@@ -32,16 +32,19 @@ use postgres_ffi::walrecord::{
|
||||
XlSmgrTruncate, XlXactParsedRecord,
|
||||
};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecord {
|
||||
/// Optional metadata record - may cause writes to metadata keys
|
||||
/// in the storage engine
|
||||
@@ -62,6 +65,7 @@ pub struct InterpretedWalRecord {
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum MetadataRecord {
|
||||
Heapam(HeapamRecord),
|
||||
Neonrmgr(NeonrmgrRecord),
|
||||
@@ -77,10 +81,12 @@ pub enum MetadataRecord {
|
||||
Replorigin(ReploriginRecord),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum HeapamRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClearVmBits {
|
||||
pub new_heap_blkno: Option<u32>,
|
||||
pub old_heap_blkno: Option<u32>,
|
||||
@@ -88,24 +94,29 @@ pub struct ClearVmBits {
|
||||
pub flags: u8,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum NeonrmgrRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum SmgrRecord {
|
||||
Create(SmgrCreate),
|
||||
Truncate(XlSmgrTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SmgrCreate {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum DbaseRecord {
|
||||
Create(DbaseCreate),
|
||||
Drop(DbaseDrop),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct DbaseCreate {
|
||||
pub db_id: Oid,
|
||||
pub tablespace_id: Oid,
|
||||
@@ -113,27 +124,32 @@ pub struct DbaseCreate {
|
||||
pub src_tablespace_id: Oid,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct DbaseDrop {
|
||||
pub db_id: Oid,
|
||||
pub tablespace_ids: Vec<Oid>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum ClogRecord {
|
||||
ZeroPage(ClogZeroPage),
|
||||
Truncate(ClogTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClogZeroPage {
|
||||
pub segno: u32,
|
||||
pub rpageno: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClogTruncate {
|
||||
pub pageno: u32,
|
||||
pub oldest_xid: TransactionId,
|
||||
pub oldest_xid_db: Oid,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum XactRecord {
|
||||
Commit(XactCommon),
|
||||
Abort(XactCommon),
|
||||
@@ -142,6 +158,7 @@ pub enum XactRecord {
|
||||
Prepare(XactPrepare),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct XactCommon {
|
||||
pub parsed: XlXactParsedRecord,
|
||||
pub origin_id: u16,
|
||||
@@ -150,61 +167,73 @@ pub struct XactCommon {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct XactPrepare {
|
||||
pub xl_xid: TransactionId,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum MultiXactRecord {
|
||||
ZeroPage(MultiXactZeroPage),
|
||||
Create(XlMultiXactCreate),
|
||||
Truncate(XlMultiXactTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct MultiXactZeroPage {
|
||||
pub slru_kind: SlruKind,
|
||||
pub segno: u32,
|
||||
pub rpageno: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum RelmapRecord {
|
||||
Update(RelmapUpdate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct RelmapUpdate {
|
||||
pub update: XlRelmapUpdate,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum XlogRecord {
|
||||
Raw(RawXlogRecord),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct RawXlogRecord {
|
||||
pub info: u8,
|
||||
pub lsn: Lsn,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum LogicalMessageRecord {
|
||||
Put(PutLogicalMessage),
|
||||
#[cfg(feature = "testing")]
|
||||
Failpoint,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct PutLogicalMessage {
|
||||
pub path: String,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum StandbyRecord {
|
||||
RunningXacts(StandbyRunningXacts),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct StandbyRunningXacts {
|
||||
pub oldest_running_xid: TransactionId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum ReploriginRecord {
|
||||
Set(XlReploriginSet),
|
||||
Drop(XlReploriginDrop),
|
||||
|
||||
@@ -16,6 +16,7 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::{key::CompactKey, value::Value};
|
||||
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
|
||||
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -29,6 +30,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
/// relation sizes. In the case of "observed" values, we only need to know
|
||||
/// the key and LSN, so two types of metadata are supported to save on network
|
||||
/// bandwidth.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum ValueMeta {
|
||||
Serialized(SerializedValueMeta),
|
||||
Observed(ObservedValueMeta),
|
||||
@@ -75,6 +77,7 @@ impl PartialEq for OrderedValueMeta {
|
||||
impl Eq for OrderedValueMeta {}
|
||||
|
||||
/// Metadata for a [`Value`] serialized into the batch.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct SerializedValueMeta {
|
||||
pub key: CompactKey,
|
||||
pub lsn: Lsn,
|
||||
@@ -86,12 +89,14 @@ pub struct SerializedValueMeta {
|
||||
}
|
||||
|
||||
/// Metadata for a [`Value`] observed by the batch
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ObservedValueMeta {
|
||||
pub key: CompactKey,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Batch of serialized [`Value`]s.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SerializedValueBatch {
|
||||
/// [`Value`]s serialized in EphemeralFile's native format,
|
||||
/// ready for disk write by the pageserver
|
||||
|
||||
@@ -1164,12 +1164,19 @@ impl<'a> DatadirModification<'a> {
|
||||
.get_rel_exists(rel, Version::Modified(self), ctx)
|
||||
.await?
|
||||
{
|
||||
tracing::debug!("Creating relation {rel:?} at lsn {}", self.get_lsn());
|
||||
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
self.put_rel_creation(rel, 0, ctx)
|
||||
.await
|
||||
.context("Relation Error")?;
|
||||
Ok(0)
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Skipping relation {rel:?} creation at lsn {}",
|
||||
self.get_lsn()
|
||||
);
|
||||
|
||||
self.tline
|
||||
.get_rel_size(rel, Version::Modified(self), ctx)
|
||||
.await
|
||||
@@ -1210,6 +1217,8 @@ impl<'a> DatadirModification<'a> {
|
||||
shard: &ShardIdentity,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::debug!("Ingesting batch with metadata: {:?}", batch.metadata);
|
||||
|
||||
let mut gaps_at_lsns = Vec::default();
|
||||
|
||||
for meta in batch.metadata.iter() {
|
||||
|
||||
@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::postgres_client::{
|
||||
wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION,
|
||||
};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -984,15 +986,29 @@ impl ConnectionManagerState {
|
||||
if info.safekeeper_connstr.is_empty() {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
match wal_stream_connection_config(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_ref(),
|
||||
match &self.conf.auth_token {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
},
|
||||
self.conf.availability_zone.as_deref(),
|
||||
) {
|
||||
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol_version: PAGESERVER_SAFEKEEPER_PROTO_VERSION,
|
||||
ttid: self.id,
|
||||
shard_number: Some(shard_identity.number.0),
|
||||
shard_count: Some(shard_identity.count.0),
|
||||
shard_stripe_size: Some(shard_identity.stripe_size.0),
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
};
|
||||
// let connection_conf_args = ConnectionConfigArgs {
|
||||
// protocol_version: POSTGRES_PROTO_VERSION,
|
||||
// ttid: self.id,
|
||||
// shard_number: None,
|
||||
// shard_count: None,
|
||||
// shard_stripe_size: None,
|
||||
// listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
// auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
// availability_zone: self.conf.availability_zone.as_deref()
|
||||
// };
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -278,6 +278,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
|
||||
match &replication_message {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// TODO(vlad) Is this crap needed?
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
|
||||
connection_status.streaming_lsn = Some(Lsn::from(
|
||||
@@ -299,6 +300,24 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecord(raw) => {
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.latest_wal_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(raw.wal_end()));
|
||||
|
||||
let interpreted = InterpretedWalRecord::des(raw.data()).unwrap();
|
||||
let end_lsn = interpreted.end_lsn;
|
||||
|
||||
let mut modification = timeline.begin_modification(end_lsn);
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {}", end_lsn))?;
|
||||
modification.commit(&ctx).await?;
|
||||
|
||||
Some(end_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
|
||||
@@ -316,6 +316,7 @@ impl ConnCfg {
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
// TODO(vlad): que?
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
host,
|
||||
|
||||
@@ -28,6 +28,7 @@ hyper0.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -57,6 +58,7 @@ sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -2,11 +2,14 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::postgres_client::PAGESERVER_SAFEKEEPER_PROTO_VERSION;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -35,6 +38,8 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub protocol_version: Option<u8>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
@@ -107,11 +112,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(options) = params.options_raw() {
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
// remove these after the PR gets deployed:
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol_version", value)) => {
|
||||
self.protocol_version =
|
||||
Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as protocol_version")
|
||||
})?);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
format!("Failed to parse {value} as tenant id")
|
||||
@@ -127,9 +142,44 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
Some(("shard_count", value)) => {
|
||||
shard_count = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard count")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_number", value)) => {
|
||||
shard_number = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard number")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_stripe_size", value)) => {
|
||||
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
if self.protocol_version == Some(PAGESERVER_SAFEKEEPER_PROTO_VERSION) {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
self.shard = Some(
|
||||
ShardIdentity::new(
|
||||
ShardNumber(number),
|
||||
ShardCount(count),
|
||||
ShardStripeSize(stripe_size),
|
||||
)
|
||||
.with_context(|| "Failed to create shard identity")?,
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params were not specified"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
@@ -150,6 +200,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
|
||||
if let Some(shard) = self.shard.as_ref() {
|
||||
tracing::Span::current()
|
||||
.record("shard", tracing::field::display(shard.shard_slug()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -258,6 +313,8 @@ impl SafekeeperPostgresHandler {
|
||||
tenant_id: None,
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
protocol_version: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
auth,
|
||||
|
||||
@@ -17,6 +17,7 @@ use tokio::{
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::{ConnectionConfigArgs, POSTGRES_PROTO_VERSION};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
@@ -325,7 +326,17 @@ async fn recovery_stream(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol_version: POSTGRES_PROTO_VERSION,
|
||||
ttid: tli.ttid,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
|
||||
@@ -11,17 +11,21 @@ use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
use postgres_ffi::get_current_timestamp;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
|
||||
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordBody, WalSndKeepAlive, XLogDataBody};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::{PAGESERVER_SAFEKEEPER_PROTO_VERSION, POSTGRES_PROTO_VERSION};
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
@@ -377,6 +381,10 @@ impl Drop for WalSenderGuard {
|
||||
}
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
pub fn protocol_version(&self) -> u8 {
|
||||
self.protocol_version.unwrap_or(POSTGRES_PROTO_VERSION)
|
||||
}
|
||||
|
||||
/// Wrapper around handle_start_replication_guts handling result. Error is
|
||||
/// handled here while we're still in walsender ttid span; with API
|
||||
/// extension, this can probably be moved into postgres_backend.
|
||||
@@ -412,6 +420,7 @@ impl SafekeeperPostgresHandler {
|
||||
let appname = self.appname.clone();
|
||||
|
||||
// Use a guard object to remove our entry from the timeline when we are done.
|
||||
// TODO(vlad): maybe thread shard stuff into here
|
||||
let ws_guard = Arc::new(tli.get_walsenders().register(
|
||||
self.ttid,
|
||||
*pgb.get_peer_addr(),
|
||||
@@ -475,9 +484,10 @@ impl SafekeeperPostgresHandler {
|
||||
tli,
|
||||
};
|
||||
|
||||
let protocol_version = self.protocol_version();
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = sender.run() => r,
|
||||
r = sender.run(protocol_version, self.shard.as_ref()) => r,
|
||||
r = reply_reader.run() => r,
|
||||
};
|
||||
|
||||
@@ -560,7 +570,35 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
/// TODO(vlad): add a run variant which accumulates a full wall record
|
||||
/// and interprets it.
|
||||
async fn run(
|
||||
&mut self,
|
||||
protocol_version: u8,
|
||||
shard: Option<&ShardIdentity>,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
match protocol_version {
|
||||
POSTGRES_PROTO_VERSION => self.run_wal_sender().await,
|
||||
PAGESERVER_SAFEKEEPER_PROTO_VERSION => {
|
||||
self.run_interpreted_record_sender(shard.unwrap()).await
|
||||
}
|
||||
// TODO: make the proto version an enum
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_interpreted_record_sender(
|
||||
&mut self,
|
||||
shard: &ShardIdentity,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut last_logged_at = std::time::Instant::now();
|
||||
let mut interpreted_records = 0;
|
||||
let mut interpreted_bytes = 0;
|
||||
let mut useful_bytes = 0;
|
||||
|
||||
let pg_version = self.tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let mut wal_decoder = WalStreamDecoder::new(self.start_pos, pg_version);
|
||||
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
@@ -601,6 +639,141 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
wal_decoder.feed_bytes(send_buf);
|
||||
|
||||
// How fast or slow is this. Write a little benchmark
|
||||
// to see how quiclky we can decode 1GiB of WAL.
|
||||
// If this is slow, then we have a problem since it bottlenecks
|
||||
// the whole afair. SK can send about 60-70MiB of raw WAL and
|
||||
// about 13-17MiB of useful interpreted WAL per second (these
|
||||
// number are for one shard).
|
||||
while let Some((record_end_lsn, recdata)) = wal_decoder
|
||||
.poll_decode()
|
||||
.with_context(|| "Failed to decode WAL")?
|
||||
{
|
||||
assert!(record_end_lsn.is_aligned());
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
shard,
|
||||
record_end_lsn,
|
||||
pg_version,
|
||||
)
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
let useful_size = interpreted.batch.buffer_size();
|
||||
|
||||
let mut buf = Vec::new();
|
||||
interpreted
|
||||
.ser_into(&mut buf)
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
|
||||
let size = buf.len();
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecord(InterpretedWalRecordBody {
|
||||
wal_end: self.end_pos.0,
|
||||
data: buf.as_slice(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
interpreted_records += 1;
|
||||
interpreted_bytes += size;
|
||||
useful_bytes += useful_size;
|
||||
}
|
||||
|
||||
// and send it
|
||||
// self.pgb
|
||||
// .write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
// wal_start: self.start_pos.0,
|
||||
// wal_end: self.end_pos.0,
|
||||
// timestamp: get_current_timestamp(),
|
||||
// data: send_buf,
|
||||
// }))
|
||||
// .await?;
|
||||
|
||||
// if let Some(appname) = &self.appname {
|
||||
// if appname == "replica" {
|
||||
// failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
|
||||
// }
|
||||
// }
|
||||
// trace!(
|
||||
// "sent {} bytes of WAL {}-{}",
|
||||
// send_size,
|
||||
// self.start_pos,
|
||||
// self.start_pos + send_size as u64
|
||||
// );
|
||||
|
||||
self.start_pos += send_size as u64;
|
||||
|
||||
let elapsed = last_logged_at.elapsed();
|
||||
if elapsed >= Duration::from_secs(5) {
|
||||
let records_rate = interpreted_records / elapsed.as_millis() * 1000;
|
||||
let bytes_rate = interpreted_bytes / elapsed.as_millis() as usize * 1000;
|
||||
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
|
||||
tracing::info!(
|
||||
"Shard {} sender rate: rps={} bps={} ubps={}",
|
||||
shard.number.0,
|
||||
records_rate,
|
||||
bytes_rate,
|
||||
useful_bytes_rate
|
||||
);
|
||||
|
||||
last_logged_at = std::time::Instant::now();
|
||||
interpreted_records = 0;
|
||||
interpreted_bytes = 0;
|
||||
useful_bytes = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_wal_sender(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut useful_bytes = 0;
|
||||
let mut last_logged_at = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
// communicate it to the receiver.
|
||||
self.wait_wal().await?;
|
||||
assert!(
|
||||
self.end_pos > self.start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = self.term {
|
||||
Some(self.tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
useful_bytes += send_buf.len();
|
||||
|
||||
// and send it
|
||||
self.pgb
|
||||
.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
@@ -623,6 +796,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
self.start_pos + send_size as u64
|
||||
);
|
||||
self.start_pos += send_size as u64;
|
||||
|
||||
let elapsed = last_logged_at.elapsed();
|
||||
if elapsed >= Duration::from_secs(5) {
|
||||
let useful_bytes_rate = useful_bytes / elapsed.as_millis() as usize * 1000;
|
||||
tracing::info!(
|
||||
"Sender rate: ubps={}",
|
||||
useful_bytes_rate
|
||||
);
|
||||
|
||||
last_logged_at = std::time::Instant::now();
|
||||
useful_bytes = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user