adopt new tokio-postgres:replication branch

This PR has evolved a lot; jump to the newer version. This should make
it easier to handle keepalive messages.
This commit is contained in:
Eric Seppanen
2021-04-15 10:43:23 -07:00
parent d2c3ad162a
commit e8032f26e6
6 changed files with 97 additions and 55 deletions

36
Cargo.lock generated
View File

@@ -872,7 +872,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project",
"socket2 0.4.0",
"socket2",
"tokio",
"tower-service",
"tracing",
@@ -1089,7 +1089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
dependencies = [
"libc",
"socket2 0.4.0",
"socket2",
]
[[package]]
@@ -1209,6 +1209,7 @@ dependencies = [
"log",
"postgres",
"postgres-protocol",
"postgres-types",
"rand 0.8.3",
"regex",
"rust-s3",
@@ -1333,8 +1334,8 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.19.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"bytes",
"fallible-iterator",
@@ -1346,8 +1347,8 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.6.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"base64",
"byteorder",
@@ -1363,8 +1364,8 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.2.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"bytes",
"fallible-iterator",
@@ -1882,17 +1883,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.4.0"
@@ -2086,8 +2076,8 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.0"
source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02"
version = "0.7.1"
source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094"
dependencies = [
"async-trait",
"byteorder",
@@ -2098,10 +2088,10 @@ dependencies = [
"parking_lot",
"percent-encoding",
"phf",
"pin-project",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2 0.3.19",
"socket2",
"tokio",
"tokio-util",
]

View File

@@ -8,8 +8,8 @@ edition = "2018"
[dependencies]
rand = "0.8.3"
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
serde = ""
serde_derive = ""

View File

@@ -9,8 +9,8 @@ edition = "2018"
[dependencies]
lazy_static = "1.4.0"
rand = "0.8.3"
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }

View File

@@ -29,9 +29,10 @@ daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"

View File

@@ -1,3 +1,5 @@
use std::str::FromStr;
//
// WAL receiver
//
@@ -13,11 +15,14 @@ use tokio_stream::StreamExt;
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::waldecoder::WalStreamDecoder;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use postgres_protocol::message::backend::ReplicationMessage;
use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
use postgres_types::PgLsn;
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::{Error, NoTls, SimpleQueryMessage, SimpleQueryRow};
//
// This is the entry point for the WAL receiver thread.
@@ -49,14 +54,10 @@ async fn walreceiver_main(
wal_producer_connstr: &str,
) -> Result<(), Error> {
// Connect to the database in replication mode.
debug!("connecting to {}...", wal_producer_connstr);
let (mut rclient, connection) = connect_replication(
wal_producer_connstr,
NoTls,
ReplicationMode::Physical,
)
.await?;
debug!("connected!");
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
let (rclient, connection) = tokio_postgres::connect(&connect_cfg, NoTls).await?;
info!("connected!");
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
@@ -66,21 +67,21 @@ async fn walreceiver_main(
}
});
let identify_system = rclient.identify_system().await?;
let end_of_wal = u64::from(identify_system.xlogpos());
let identify = identify_system(&rclient).await?;
info!("{:?}", identify);
let end_of_wal = u64::from(identify.xlogpos);
let mut caught_up = false;
let sysid: u64 = identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecache(conf, sysid);
let pcache = page_cache::get_pagecache(conf, identify.systemid);
//
// Start streaming the WAL, from where we left off previously.
//
let mut startpoint = pcache.get_last_valid_lsn();
if startpoint == 0 {
// If we start here with identify_system.xlogpos() we will have race condition with
// If we start here with identify.xlogpos we will have race condition with
// postgres start: insert into postgres may request page that was modified with lsn
// smaller than identify_system.xlogpos().
// smaller than identify.xlogpos.
//
// Current procedure for starting postgres will anyway be changed to something
// different like having 'initdb' method on a pageserver (or importing some shared
@@ -105,10 +106,17 @@ async fn walreceiver_main(
(end_of_wal >> 32),
(end_of_wal & 0xffffffff)
);
let startpoint = tokio_postgres::types::Lsn::from(startpoint);
let mut physical_stream = rclient
.start_physical_replication(None, startpoint, None)
.await?;
let startpoint = PgLsn::from(startpoint);
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = rclient
.copy_both_simple::<bytes::Bytes>(&query)
.await
.unwrap();
let physical_stream = ReplicationStream::new(copy_stream);
tokio::pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
while let Some(replication_message) = physical_stream.next().await {
@@ -132,8 +140,7 @@ async fn walreceiver_main(
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode() {
let decoded =
crate::waldecoder::decode_wal_record(startlsn, recdata.clone());
let decoded = decode_wal_record(startlsn, recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
@@ -192,3 +199,47 @@ async fn walreceiver_main(
}
return Ok(());
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.
///
/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
#[derive(Debug)]
pub struct IdentifySystem {
systemid: u64,
timeline: u32,
xlogpos: PgLsn,
dbname: Option<String>,
}
/// Run the postgres `IDENTIFY_SYSTEM` command
pub async fn identify_system(client: &tokio_postgres::Client) -> Result<IdentifySystem, Error> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Option<T>
where
T: FromStr,
{
let val = row.get(idx)?;
val.parse::<T>().ok()
}
// FIXME: turn unwrap() into errors.
// All of the tokio_postgres::Error builders are private, so we
// can't create them here. We'll just have to create our own error type.
if let SimpleQueryMessage::Row(first_row) = response.get(0).unwrap() {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0).unwrap(),
timeline: get_parse(first_row, 1).unwrap(),
xlogpos: get_parse(first_row, 2).unwrap(),
dbname: get_parse(first_row, 3),
})
} else {
// FIXME: return an error
panic!("identify_system returned non-row response");
}
}

View File

@@ -29,9 +29,9 @@ daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
anyhow = "1.0"
crc32c = "0.6.0"