From d25656797c9eda5c68c8ade65eb9b512e99624ab Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 2 May 2021 23:28:26 -0700 Subject: [PATCH] switch pageserver to blocking postgres interface --- control_plane/src/compute.rs | 2 +- pageserver/src/walreceiver.rs | 56 +++++++++-------------------------- 2 files changed, 15 insertions(+), 43 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 0d36b21427..1ce3050b01 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -419,7 +419,7 @@ impl PostgresNode { } } - pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { + pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { let connstring = format!( "host={} port={} dbname={} user={}", self.address.ip(), diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index d9ee05c739..046c42534c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -14,6 +14,9 @@ use crate::ZTimelineId; use anyhow::Error; use lazy_static::lazy_static; use log::*; +use postgres::fallible_iterator::FallibleIterator; +use postgres::replication::ReplicationIter; +use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use postgres_ffi::xlog_utils::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; @@ -27,10 +30,6 @@ use std::sync::Mutex; use std::thread; use std::thread::sleep; use std::time::{Duration, SystemTime}; -use tokio::runtime::Runtime; -use tokio_postgres::replication::ReplicationStream; -use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; -use tokio_stream::StreamExt; use zenith_utils::lsn::Lsn; // @@ -95,16 +94,6 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { timelineid ); - // We need a tokio runtime to call the rust-postgres copy_both function. - // Most functions in the rust-postgres driver have a blocking wrapper, - // but copy_both does not (TODO: the copy_both support is still work-in-progress - // as of this writing. Check later if that has changed, or implement the - // wrapper ourselves in rust-postgres) - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - // // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. If the connection is lost, keep retrying. @@ -113,7 +102,7 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { // Look up the current WAL producer address let wal_producer_connstr = get_wal_producer_connstr(timelineid); - let res = walreceiver_main(&runtime, conf, timelineid, &wal_producer_connstr); + let res = walreceiver_main(conf, timelineid, &wal_producer_connstr); if let Err(e) = res { info!( @@ -126,7 +115,6 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { } fn walreceiver_main( - runtime: &Runtime, _conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, @@ -135,18 +123,10 @@ fn walreceiver_main( info!("connecting to {:?}", wal_producer_connstr); let connect_cfg = format!("{} replication=true", wal_producer_connstr); - let (rclient, connection) = runtime.block_on(tokio_postgres::connect(&connect_cfg, NoTls))?; + let mut rclient = Client::connect(&connect_cfg, NoTls)?; info!("connected!"); - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - runtime.spawn(async move { - if let Err(e) = connection.await { - error!("connection error: {}", e); - } - }); - - let identify = identify_system(runtime, &rclient)?; + let identify = identify_system(&mut rclient)?; info!("{:?}", identify); let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; @@ -188,15 +168,13 @@ fn walreceiver_main( let query = format!("START_REPLICATION PHYSICAL {}", startpoint); - let copy_stream = runtime.block_on(rclient.copy_both_simple::(&query))?; - - let physical_stream = ReplicationStream::new(copy_stream); - tokio::pin!(physical_stream); + let copy_stream = rclient.copy_both_simple(&query)?; + let mut physical_stream = ReplicationIter::new(copy_stream); let mut waldecoder = WalStreamDecoder::new(startpoint); - while let Some(replication_message) = runtime.block_on(physical_stream.next()) { - match replication_message? { + while let Some(replication_message) = physical_stream.next()? { + match replication_message { ReplicationMessage::XLogData(xlog_data) => { // Pass the WAL data to the decoder, and see if we can decode // more records as a result. @@ -258,11 +236,8 @@ fn walreceiver_main( let ts = SystemTime::now(); const NO_REPLY: u8 = 0u8; - runtime.block_on( - physical_stream - .as_mut() - .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY), - )?; + physical_stream + .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; } } _ => (), @@ -291,12 +266,9 @@ pub struct IdentifySystem { pub struct IdentifyError; /// Run the postgres `IDENTIFY_SYSTEM` command -pub fn identify_system( - runtime: &Runtime, - client: &tokio_postgres::Client, -) -> Result { +pub fn identify_system(client: &mut Client) -> Result { let query_str = "IDENTIFY_SYSTEM"; - let response = runtime.block_on(client.simple_query(query_str))?; + let response = client.simple_query(query_str)?; // get(N) from row, then parse it as some destination type. fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result