diff --git a/Cargo.lock b/Cargo.lock index fe4be36d72..8299cbe7d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -872,7 +872,7 @@ dependencies = [ "httpdate", "itoa", "pin-project", - "socket2 0.4.0", + "socket2", "tokio", "tower-service", "tracing", @@ -1089,7 +1089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d" dependencies = [ "libc", - "socket2 0.4.0", + "socket2", ] [[package]] @@ -1209,6 +1209,7 @@ dependencies = [ "log", "postgres", "postgres-protocol", + "postgres-types", "rand 0.8.3", "regex", "rust-s3", @@ -1333,8 +1334,8 @@ dependencies = [ [[package]] name = "postgres" -version = "0.19.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.19.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "bytes", "fallible-iterator", @@ -1346,8 +1347,8 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.6.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "base64", "byteorder", @@ -1363,8 +1364,8 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.2.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "bytes", "fallible-iterator", @@ -1882,17 +1883,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" -[[package]] -name = "socket2" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.4.0" @@ -2086,8 +2076,8 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.7.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "async-trait", "byteorder", @@ -2098,10 +2088,10 @@ dependencies = [ "parking_lot", "percent-encoding", "phf", - "pin-project", + "pin-project-lite", "postgres-protocol", "postgres-types", - "socket2 0.3.19", + "socket2", "tokio", "tokio-util", ] diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 4699a4da40..c4ec2e9b33 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -8,8 +8,8 @@ edition = "2018" [dependencies] rand = "0.8.3" -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } serde = "" serde_derive = "" diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index ad7913cc5c..51f9d0c773 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -9,8 +9,8 @@ edition = "2018" [dependencies] lazy_static = "1.4.0" rand = "0.8.3" -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 177cfe4b24..5da883d81a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -29,9 +29,10 @@ daemonize = "0.4.1" rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } anyhow = "1.0" crc32c = "0.6.0" walkdir = "2" diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 7ce3e9727f..76471084f7 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + // // WAL receiver // @@ -13,11 +15,14 @@ use tokio_stream::StreamExt; use crate::page_cache; use crate::page_cache::BufferTag; -use crate::waldecoder::WalStreamDecoder; +use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use postgres_protocol::message::backend::ReplicationMessage; -use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode}; +use postgres_types::PgLsn; +use tokio_postgres::replication::ReplicationStream; + +use tokio_postgres::{Error, NoTls, SimpleQueryMessage, SimpleQueryRow}; // // This is the entry point for the WAL receiver thread. @@ -49,14 +54,10 @@ async fn walreceiver_main( wal_producer_connstr: &str, ) -> Result<(), Error> { // Connect to the database in replication mode. - debug!("connecting to {}...", wal_producer_connstr); - let (mut rclient, connection) = connect_replication( - wal_producer_connstr, - NoTls, - ReplicationMode::Physical, - ) - .await?; - debug!("connected!"); + info!("connecting to {:?}", wal_producer_connstr); + let connect_cfg = format!("{} replication=true", wal_producer_connstr); + let (rclient, connection) = tokio_postgres::connect(&connect_cfg, NoTls).await?; + info!("connected!"); // The connection object performs the actual communication with the database, // so spawn it off to run on its own. @@ -66,21 +67,21 @@ async fn walreceiver_main( } }); - let identify_system = rclient.identify_system().await?; - let end_of_wal = u64::from(identify_system.xlogpos()); + let identify = identify_system(&rclient).await?; + info!("{:?}", identify); + let end_of_wal = u64::from(identify.xlogpos); let mut caught_up = false; - let sysid: u64 = identify_system.systemid().parse().unwrap(); - let pcache = page_cache::get_pagecache(conf, sysid); + let pcache = page_cache::get_pagecache(conf, identify.systemid); // // Start streaming the WAL, from where we left off previously. // let mut startpoint = pcache.get_last_valid_lsn(); if startpoint == 0 { - // If we start here with identify_system.xlogpos() we will have race condition with + // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn - // smaller than identify_system.xlogpos(). + // smaller than identify.xlogpos. // // Current procedure for starting postgres will anyway be changed to something // different like having 'initdb' method on a pageserver (or importing some shared @@ -105,10 +106,17 @@ async fn walreceiver_main( (end_of_wal >> 32), (end_of_wal & 0xffffffff) ); - let startpoint = tokio_postgres::types::Lsn::from(startpoint); - let mut physical_stream = rclient - .start_physical_replication(None, startpoint, None) - .await?; + + let startpoint = PgLsn::from(startpoint); + let query = format!("START_REPLICATION PHYSICAL {}", startpoint); + let copy_stream = rclient + .copy_both_simple::(&query) + .await + .unwrap(); + + let physical_stream = ReplicationStream::new(copy_stream); + tokio::pin!(physical_stream); + let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); while let Some(replication_message) = physical_stream.next().await { @@ -132,8 +140,7 @@ async fn walreceiver_main( loop { if let Some((lsn, recdata)) = waldecoder.poll_decode() { - let decoded = - crate::waldecoder::decode_wal_record(startlsn, recdata.clone()); + let decoded = decode_wal_record(startlsn, recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in @@ -192,3 +199,47 @@ async fn walreceiver_main( } return Ok(()); } + +/// Data returned from the postgres `IDENTIFY_SYSTEM` command +/// +/// See the [postgres docs] for more details. +/// +/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html +#[derive(Debug)] +pub struct IdentifySystem { + systemid: u64, + timeline: u32, + xlogpos: PgLsn, + dbname: Option, +} + +/// Run the postgres `IDENTIFY_SYSTEM` command +pub async fn identify_system(client: &tokio_postgres::Client) -> Result { + let query_str = "IDENTIFY_SYSTEM"; + let response = client.simple_query(query_str).await?; + + // get(N) from row, then parse it as some destination type. + fn get_parse(row: &SimpleQueryRow, idx: usize) -> Option + where + T: FromStr, + { + let val = row.get(idx)?; + val.parse::().ok() + } + + // FIXME: turn unwrap() into errors. + // All of the tokio_postgres::Error builders are private, so we + // can't create them here. We'll just have to create our own error type. + + if let SimpleQueryMessage::Row(first_row) = response.get(0).unwrap() { + Ok(IdentifySystem { + systemid: get_parse(first_row, 0).unwrap(), + timeline: get_parse(first_row, 1).unwrap(), + xlogpos: get_parse(first_row, 2).unwrap(), + dbname: get_parse(first_row, 3), + }) + } else { + // FIXME: return an error + panic!("identify_system returned non-row response"); + } +} diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 76dcd12582..e025a26d9b 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -29,9 +29,9 @@ daemonize = "0.4.1" rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } anyhow = "1.0" crc32c = "0.6.0"