mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Start pageserver walreceiver from predefined "-inf" lsn.
If we start walreceiver with identify_system.xlogpos() we will have race condition with postgres start: postgres may request page that was modified with lsn smaller than identify_system.xlogpos(). Current procedure for starting postgres will anyway be changed to something different like having 'initdb' method on a pageserver (or importing some shared empty database snapshot), so for now I just put start of first segment which seems to be a valid record and is strictly before first lsn records.
This commit is contained in:
@@ -71,9 +71,16 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
|
||||
//
|
||||
let mut startpoint = pcache.get_last_valid_lsn();
|
||||
if startpoint == 0 {
|
||||
// FIXME: Or should we just error out?
|
||||
pcache.init_valid_lsn(u64::from(_identify_system.xlogpos()));
|
||||
startpoint = u64::from(_identify_system.xlogpos());
|
||||
// If we start here with _identify_system.xlogpos() we will have race condition with
|
||||
// postgres start: insert into postgres may request page that was modified with lsn
|
||||
// smaller than _identify_system.xlogpos().
|
||||
//
|
||||
// Current procedure for starting postgres will anyway be changed to something
|
||||
// different like having 'initdb' method on a pageserver (or importing some shared
|
||||
// empty database snapshot), so for now I just put start of first segment which
|
||||
// seems to be a valid record.
|
||||
pcache.init_valid_lsn(0x_1_000_000_u64);
|
||||
startpoint = u64::from(0x_1_000_000_u64);
|
||||
} else {
|
||||
// There might be some padding after the last full record, skip it.
|
||||
//
|
||||
@@ -90,7 +97,7 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
|
||||
.start_physical_replication(None, startpoint, None)
|
||||
.await?;
|
||||
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
||||
|
||||
|
||||
while let Some(replication_message) = physical_stream.next().await {
|
||||
match replication_message? {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver::control_plane::ComputeControlPlane;
|
||||
use pageserver::control_plane::StorageControlPlane;
|
||||
|
||||
@@ -19,9 +16,6 @@ fn test_redo_cases() {
|
||||
let node = compute_cplane.new_node();
|
||||
node.start(&storage_cplane);
|
||||
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
// check basic work with table
|
||||
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
||||
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
||||
@@ -55,9 +49,6 @@ fn test_regress() {
|
||||
let node = compute_cplane.new_node();
|
||||
node.start(&storage_cplane);
|
||||
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
pageserver::control_plane::regress_check(&node);
|
||||
}
|
||||
|
||||
@@ -74,11 +65,6 @@ fn test_pageserver_multitenancy() {
|
||||
node1.start(&storage_cplane);
|
||||
node2.start(&storage_cplane);
|
||||
|
||||
// XXX: add some extension func to postgres to check walsender conn
|
||||
// XXX: or better just drop that
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
// check node1
|
||||
node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
||||
node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
||||
|
||||
Reference in New Issue
Block a user