From bd606ab37a0ecd620c086df46f62d78045761716 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Mon, 5 Apr 2021 12:32:51 +0300 Subject: [PATCH] Start pageserver walreceiver from predefined "-inf" lsn. If we start walreceiver with identify_system.xlogpos() we will have race condition with postgres start: postgres may request page that was modified with lsn smaller than identify_system.xlogpos(). Current procedure for starting postgres will anyway be changed to something different like having 'initdb' method on a pageserver (or importing some shared empty database snapshot), so for now I just put start of first segment which seems to be a valid record and is strictly before first lsn records. --- src/walreceiver.rs | 15 +++++++++++---- tests/test_pageserver.rs | 14 -------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/walreceiver.rs b/src/walreceiver.rs index ad4c82fc14..9005aad43b 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -71,9 +71,16 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - // let mut startpoint = pcache.get_last_valid_lsn(); if startpoint == 0 { - // FIXME: Or should we just error out? - pcache.init_valid_lsn(u64::from(_identify_system.xlogpos())); - startpoint = u64::from(_identify_system.xlogpos()); + // If we start here with _identify_system.xlogpos() we will have race condition with + // postgres start: insert into postgres may request page that was modified with lsn + // smaller than _identify_system.xlogpos(). + // + // Current procedure for starting postgres will anyway be changed to something + // different like having 'initdb' method on a pageserver (or importing some shared + // empty database snapshot), so for now I just put start of first segment which + // seems to be a valid record. + pcache.init_valid_lsn(0x_1_000_000_u64); + startpoint = u64::from(0x_1_000_000_u64); } else { // There might be some padding after the last full record, skip it. // @@ -90,7 +97,7 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - .start_physical_replication(None, startpoint, None) .await?; let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); - + while let Some(replication_message) = physical_stream.next().await { match replication_message? { ReplicationMessage::XLogData(xlog_data) => { diff --git a/tests/test_pageserver.rs b/tests/test_pageserver.rs index 78e3889941..2a68f9fe84 100644 --- a/tests/test_pageserver.rs +++ b/tests/test_pageserver.rs @@ -1,6 +1,3 @@ -use std::thread::sleep; -use std::time::Duration; - use pageserver::control_plane::ComputeControlPlane; use pageserver::control_plane::StorageControlPlane; @@ -19,9 +16,6 @@ fn test_redo_cases() { let node = compute_cplane.new_node(); node.start(&storage_cplane); - println!("await pageserver connection..."); - sleep(Duration::from_secs(3)); - // check basic work with table node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'"); @@ -55,9 +49,6 @@ fn test_regress() { let node = compute_cplane.new_node(); node.start(&storage_cplane); - println!("await pageserver connection..."); - sleep(Duration::from_secs(3)); - pageserver::control_plane::regress_check(&node); } @@ -74,11 +65,6 @@ fn test_pageserver_multitenancy() { node1.start(&storage_cplane); node2.start(&storage_cplane); - // XXX: add some extension func to postgres to check walsender conn - // XXX: or better just drop that - println!("await pageserver connection..."); - sleep(Duration::from_secs(3)); - // check node1 node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");