switch pageserver to blocking postgres interface

This commit is contained in:
Eric Seppanen
2021-05-02 23:28:26 -07:00
parent 6c825dcbaa
commit d25656797c
2 changed files with 15 additions and 43 deletions

View File

@@ -419,7 +419,7 @@ impl PostgresNode {
}
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),

View File

@@ -14,6 +14,9 @@ use crate::ZTimelineId;
use anyhow::Error;
use lazy_static::lazy_static;
use log::*;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::xlog_utils::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
@@ -27,10 +30,6 @@ use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::{Duration, SystemTime};
use tokio::runtime::Runtime;
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
use zenith_utils::lsn::Lsn;
//
@@ -95,16 +94,6 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
timelineid
);
// We need a tokio runtime to call the rust-postgres copy_both function.
// Most functions in the rust-postgres driver have a blocking wrapper,
// but copy_both does not (TODO: the copy_both support is still work-in-progress
// as of this writing. Check later if that has changed, or implement the
// wrapper ourselves in rust-postgres)
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
//
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it. If the connection is lost, keep retrying.
@@ -113,7 +102,7 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
let res = walreceiver_main(&runtime, conf, timelineid, &wal_producer_connstr);
let res = walreceiver_main(conf, timelineid, &wal_producer_connstr);
if let Err(e) = res {
info!(
@@ -126,7 +115,6 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
}
fn walreceiver_main(
runtime: &Runtime,
_conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
@@ -135,18 +123,10 @@ fn walreceiver_main(
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
let (rclient, connection) = runtime.block_on(tokio_postgres::connect(&connect_cfg, NoTls))?;
let mut rclient = Client::connect(&connect_cfg, NoTls)?;
info!("connected!");
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
runtime.spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
let identify = identify_system(runtime, &rclient)?;
let identify = identify_system(&mut rclient)?;
info!("{:?}", identify);
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
@@ -188,15 +168,13 @@ fn walreceiver_main(
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = runtime.block_on(rclient.copy_both_simple::<bytes::Bytes>(&query))?;
let physical_stream = ReplicationStream::new(copy_stream);
tokio::pin!(physical_stream);
let copy_stream = rclient.copy_both_simple(&query)?;
let mut physical_stream = ReplicationIter::new(copy_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
while let Some(replication_message) = runtime.block_on(physical_stream.next()) {
match replication_message? {
while let Some(replication_message) = physical_stream.next()? {
match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
@@ -258,11 +236,8 @@ fn walreceiver_main(
let ts = SystemTime::now();
const NO_REPLY: u8 = 0u8;
runtime.block_on(
physical_stream
.as_mut()
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY),
)?;
physical_stream
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
}
}
_ => (),
@@ -291,12 +266,9 @@ pub struct IdentifySystem {
pub struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
pub fn identify_system(
runtime: &Runtime,
client: &tokio_postgres::Client,
) -> Result<IdentifySystem, Error> {
pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
let query_str = "IDENTIFY_SYSTEM";
let response = runtime.block_on(client.simple_query(query_str))?;
let response = client.simple_query(query_str)?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>