mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
Start pageserver walreceiver from predefined "-inf" lsn.
If we start walreceiver with identify_system.xlogpos() we will have race condition with postgres start: postgres may request page that was modified with lsn smaller than identify_system.xlogpos(). Current procedure for starting postgres will anyway be changed to something different like having 'initdb' method on a pageserver (or importing some shared empty database snapshot), so for now I just put start of first segment which seems to be a valid record and is strictly before first lsn records.
This commit is contained in:
@@ -71,9 +71,16 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
|
|||||||
//
|
//
|
||||||
let mut startpoint = pcache.get_last_valid_lsn();
|
let mut startpoint = pcache.get_last_valid_lsn();
|
||||||
if startpoint == 0 {
|
if startpoint == 0 {
|
||||||
// FIXME: Or should we just error out?
|
// If we start here with _identify_system.xlogpos() we will have race condition with
|
||||||
pcache.init_valid_lsn(u64::from(_identify_system.xlogpos()));
|
// postgres start: insert into postgres may request page that was modified with lsn
|
||||||
startpoint = u64::from(_identify_system.xlogpos());
|
// smaller than _identify_system.xlogpos().
|
||||||
|
//
|
||||||
|
// Current procedure for starting postgres will anyway be changed to something
|
||||||
|
// different like having 'initdb' method on a pageserver (or importing some shared
|
||||||
|
// empty database snapshot), so for now I just put start of first segment which
|
||||||
|
// seems to be a valid record.
|
||||||
|
pcache.init_valid_lsn(0x_1_000_000_u64);
|
||||||
|
startpoint = u64::from(0x_1_000_000_u64);
|
||||||
} else {
|
} else {
|
||||||
// There might be some padding after the last full record, skip it.
|
// There might be some padding after the last full record, skip it.
|
||||||
//
|
//
|
||||||
@@ -90,7 +97,7 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
|
|||||||
.start_physical_replication(None, startpoint, None)
|
.start_physical_replication(None, startpoint, None)
|
||||||
.await?;
|
.await?;
|
||||||
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
||||||
|
|
||||||
while let Some(replication_message) = physical_stream.next().await {
|
while let Some(replication_message) = physical_stream.next().await {
|
||||||
match replication_message? {
|
match replication_message? {
|
||||||
ReplicationMessage::XLogData(xlog_data) => {
|
ReplicationMessage::XLogData(xlog_data) => {
|
||||||
|
|||||||
@@ -1,6 +1,3 @@
|
|||||||
use std::thread::sleep;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use pageserver::control_plane::ComputeControlPlane;
|
use pageserver::control_plane::ComputeControlPlane;
|
||||||
use pageserver::control_plane::StorageControlPlane;
|
use pageserver::control_plane::StorageControlPlane;
|
||||||
|
|
||||||
@@ -19,9 +16,6 @@ fn test_redo_cases() {
|
|||||||
let node = compute_cplane.new_node();
|
let node = compute_cplane.new_node();
|
||||||
node.start(&storage_cplane);
|
node.start(&storage_cplane);
|
||||||
|
|
||||||
println!("await pageserver connection...");
|
|
||||||
sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
// check basic work with table
|
// check basic work with table
|
||||||
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
||||||
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
||||||
@@ -55,9 +49,6 @@ fn test_regress() {
|
|||||||
let node = compute_cplane.new_node();
|
let node = compute_cplane.new_node();
|
||||||
node.start(&storage_cplane);
|
node.start(&storage_cplane);
|
||||||
|
|
||||||
println!("await pageserver connection...");
|
|
||||||
sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
pageserver::control_plane::regress_check(&node);
|
pageserver::control_plane::regress_check(&node);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,11 +65,6 @@ fn test_pageserver_multitenancy() {
|
|||||||
node1.start(&storage_cplane);
|
node1.start(&storage_cplane);
|
||||||
node2.start(&storage_cplane);
|
node2.start(&storage_cplane);
|
||||||
|
|
||||||
// XXX: add some extension func to postgres to check walsender conn
|
|
||||||
// XXX: or better just drop that
|
|
||||||
println!("await pageserver connection...");
|
|
||||||
sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
// check node1
|
// check node1
|
||||||
node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
|
||||||
node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
|
||||||
|
|||||||
Reference in New Issue
Block a user