Remove leftover references to safekeeper_proxy.

We don't use it anymore. The WAL proposer is now a background worker that
runs as part of the primary Postgres server.
This commit is contained in:
Heikki Linnakangas
2021-06-01 18:50:24 +03:00
parent 558a2214bc
commit fc01fae9b4
7 changed files with 46 additions and 77 deletions

View File

@@ -121,11 +121,6 @@ Depends on the modified 'postgres' binary for WAL redo.
PostgreSQL source tree, with the modifications needed for Zenith.
`/vendor/postgres/src/bin/safekeeper`:
Extension (safekeeper_proxy) that runs in the compute node, and connects to the WAL safekeepers
and streams the WAL
`/test_runner`:
Integration tests, written in Python using the `pytest` framework.

View File

@@ -135,7 +135,7 @@ impl ComputeControlPlane {
node.append_conf(
"postgresql.conf",
"synchronous_standby_names = 'safekeeper_proxy'\n",
"synchronous_standby_names = 'walproposer'\n",
)
.unwrap();

View File

@@ -3,7 +3,7 @@ use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::fs::{self, File, OpenOptions};
use std::fs::{self, File};
use std::io::Read;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
@@ -164,7 +164,6 @@ impl Drop for TestStorageControlPlane {
pub trait PostgresNodeExt {
fn pg_regress(&self) -> ExitStatus;
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus;
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode;
fn open_psql(&self, db: &str) -> postgres::Client;
fn dump_log_file(&self);
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row>;
@@ -247,28 +246,6 @@ impl PostgresNodeExt for PostgresNode {
pg_bench_run
}
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(self.pgdata().join("safekeeper_proxy.log"))
.unwrap(),
)
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) {
let mut buffer = String::new();

View File

@@ -12,13 +12,13 @@ safekeeper yet.
The primary connects to the WAL safekeeper, so it works in a "push"
fashion. That's different from how streaming replication usually
works, where the replica initiates the connection. To do that, there
is a component called "safekeeper_proxy". The safekeeper_proxy runs on
the same host as the primary Postgres server and connects to it to do
streaming replication. It also connects to the WAL safekeeper, and
forwards all the WAL. (PostgreSQL's archive_commands works in the
is a component called the "WAL proposer". The WAL proposer is a
background worker that runs in the primary Postgres server. It
connects to the WAL safekeeper, and
sends all the WAL. (PostgreSQL's archive_commands works in the
"push" style, but it operates on a WAL segment granularity. If
PostgreSQL had a push style API for streaming, we wouldn't need the
proxy).
PostgreSQL had a push style API for streaming, WAL propose could be
implemented using it.)
The Page Server connects to the WAL safekeeper, using the same
streaming replication protocol that's used between Postgres primary

View File

@@ -1,15 +1,15 @@
# Proxy-safekeeper communication consensus protocol.
# WAL proposer-safekeeper communication consensus protocol.
## General requirements and architecture
There is single stateless master and several safekeepers. Number of safekeepers is determined by redundancy level.
To minimize number of changes in Postgres core, we are using standard streaming replication from master (through WAL sender).
This replication stream is initiated by `safekeeper_proxy` which receives data from the master and broadcasts it to safekeepers.
This replication stream is initiated by the WAL proposer process that runs in the PostgreSQL server, which broadcasts the WAL generated by PostgreSQL to safekeepers.
To provide durability we use synchronous replication at master (response to the commit statement is sent to the client
only when acknowledged by WAL receiver). `safekeeper_proxy` sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.
only when acknowledged by WAL receiver). WAL proposer sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.
`Safekeeper_proxy` tries to establish connections with safekeepers.
At any moment of time each safekeeper can serve exactly once proxy, but it can accept new connections.
WAL proposer tries to establish connections with safekeepers.
At any moment of time each safekeeper can serve exactly once proposer, but it can accept new connections.
Any of safekeepers can be used as WAL server, producing replication stream. So both `Pagers` and `Replicas`
(read-only computation nodes) can connect to safekeeper to receive WAL stream. Safekeepers is streaming WAL until
@@ -30,13 +30,13 @@ then accepts proposed nodeId and persists this choice in the local control file.
5. If quorum of safekeepers approve proposed nodeId, then server assumes that handshake is successfully completed and switch to recovery stage.
## Recovery
Proxy computes max(`restartLSN`) and max(`flushLSN`) from quorum of attached safekeepers.
Proposer computes max(`restartLSN`) and max(`flushLSN`) from quorum of attached safekeepers.
`RestartLSN` - is position in WAL which is known to be delivered to all safekeepers.
In other words: `restartLSN` can be also considered as cut-off horizon (all preceding WAL segments can be removed).
`FlushLSN` is position flushed by safekeeper to the local persistent storage.
If max(`restartLSN`) != max(`flushLSN`), then recovery has to be performed.
Proxy creates replication channel with most advanced safekeeper (safekeeper with the largest `flushLSN`).
Proposer creates replication channel with most advanced safekeeper (safekeeper with the largest `flushLSN`).
Then it downloads all WAL messages between max(`restartLSN`)..max(`flushLSN`).
Messages are inserted in L1-list (ordered by LSN). Then we locate position of each safekeeper in this list according
to their `flushLSN`s. Safekeepers that are not yet connected (out of quorum) should start from the beginning of the list
@@ -49,11 +49,11 @@ to avoid loose of committed data.
Calculated max(`flushLSN`) is called `VCL` (Volume Complete LSN). As far as it is chosen among quorum, there may be some other offline safekeeper with larger
`VCL`. Once it becomes online, we need to overwrite its WAL beyond `VCL`. To support it, each safekeeper maintains
`epoch` number. `Epoch` plays almost the same role as `term`, but algorithm of `epoch` bumping is different.
`VCL` and new epoch are received by safekeeper from proxy during voting.
`VCL` and new epoch are received by safekeeper from proposer during voting.
But safekeeper doesn't switch to new epoch immediately after voting.
Instead of it, safekeepers waits record with LSN > Max(`flushLSN`,`VCL`) is received.
It means that we restore all records from old generation and switch to new generation.
When proxy calculates max(`FlushLSN`), it first compares `Epoch`. So actually we compare (`Epoch`,`FlushLSN`) pairs.
When proposer calculates max(`FlushLSN`), it first compares `Epoch`. So actually we compare (`Epoch`,`FlushLSN`) pairs.
Let's looks at the examples. Consider that we have three safekeepers: S1, S2, S3. Si(N) means that i-th safekeeper has epoch=N.
Ri(x) - WAL record for resource X with LSN=i. Assume that we have the following state:
@@ -64,7 +64,7 @@ S2(1): R1(a),R2(b)
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
```
Proxy choose quorum (S1,S2). VCL for them is 2. We download S2 to proxy and schedule its write to S1.
Proposer choose quorum (S1,S2). VCL for them is 2. We download S2 to proposer and schedule its write to S1.
After receiving record R5 the picture can be:
```
@@ -119,29 +119,26 @@ S3(1): R1(a),R2(b),R3(c),R4(d)
```
## Main loop
Once recovery is completed, proxy switches to normal processing loop: it receives WAL stream from master and appends WAL
Once recovery is completed, proposer switches to normal processing loop: it receives WAL stream from Postgres and appends WAL
messages to the list. At the same time it tries to push messages to safekeepers. Each safekeeper is associated
with some element in message list and once it acknowledged receiving of the message, position is moved forward.
Each queue element contains acknowledgment mask, which bits corresponds to safekeepers.
Once all safekeepers acknowledged receiving of this message (by setting correspondent bit),
then element can be removed from queue and `restartLSN` is advanced forward.
Proxy maintains `restartLSN` and `commitLSN` based on the responses received by safekeepers.
Proposer maintains `restartLSN` and `commitLSN` based on the responses received by safekeepers.
`RestartLSN` equals to the LSN of head message in the list. `CommitLSN` is `flushLSN[nSafekeepers-Quorum]` element
in ordered array with `flushLSN`s of safekeepers. `CommitLSN` and `RestartLSN` are included in requests
sent from proxy to safekeepers and stored in safekeepers control file.
sent from proposer to safekeepers and stored in safekeepers control file.
To avoid overhead of extra fsync, this control file is not fsynced on each request. Flushing this file is performed
periodically, which means that `restartLSN`/`commitLSN` stored by safekeeper may be slightly deteriorated.
It is not critical because may only cause redundant processing of some WAL record.
And `FlushLSN` is recalculated after node restart by scanning local WAL files.
## Fault tolerance
Once `safekeeper_proxy` looses connection to safekeeper it tries to reestablish this connection using the same nodeId.
If `safekeeper_proxy` looses connection with master, it is terminated. Right now safekeeper is standalone process,
which can be launched at any node, but it can be also spawned as master's background worker, so that it is automatically
restarted in case of Postgres instance restart.
If the WAL proposer process looses connection to safekeeper it tries to reestablish this connection using the same nodeId.
Restart of `safekeeper_proxy` initiates new round of voting and switching new epoch.
Restart of PostgreSQL initiates new round of voting and switching new epoch.
## Limitations
Right now message queue is maintained in main memory and is not spilled to the disk.
@@ -161,7 +158,7 @@ It is assumed that in case of loosing local data by some safekeepers, it should
## Algorithm
```python
process SafekeeperProxy(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
process WalProposer(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
function do_recovery(epoch,restart_lsn,VCL)
leader = i:safekeepers[i].state.epoch=epoch and safekeepers[i].state.flushLsn=VCL
wal_stream = safekeepers[leader].start_replication(restart_lsn,VCL)
@@ -241,31 +238,31 @@ end process
process safekeeper(gateway,state)
function handshake()
proxy = gateway.accept()
server_info = proxy.read()
proxy.write(state)
proposal = proxy.read()
proposer = gateway.accept()
server_info = proposer.read()
proposer.write(state)
proposal = proposer.read()
if proposal.nodeId < state.nodeId
proxy.write(rejected)
proposer.write(rejected)
return null
else
state.nodeId = proposal.nodeId
state.proposed_epoch = proposal.epoch
state.VCL = proposal.VCL
write_control_file(state)
proxy.write(accepted)
return proxy
proposer.write(accepted)
return proposer
end function
state = read_control_file()
state.flushLsn = locate_end_of_wal()
for ever
proxy = handshake()
if not proxy
proposer = handshake()
if not proposer
continue
for ever
req = proxy.read()
req = proposer.read()
if req.nodeId != state.nodeId
break
save_wal_file(req.data)
@@ -276,7 +273,7 @@ process safekeeper(gateway,state)
state.flushLsn = req.endPos
save_control_file(state)
resp = Response(state.epoch,req.endPos)
proxy.write(resp)
proposer.write(resp)
notify_wal_sender(Min(req.commitLsn,req.endPos))
end process
```

View File

@@ -40,7 +40,7 @@ pub struct NodeId {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ServerInfo {
/// proxy-safekeeper protocol version
/// proposer-safekeeper protocol version
pub protocol_version: u32,
/// Postgres server version
pub pg_version: u32,
@@ -53,7 +53,7 @@ pub struct ServerInfo {
pub wal_seg_size: u32,
}
/// Vote request sent from proxy to safekeepers
/// Vote request sent from proposer to safekeepers
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct RequestVote {
node_id: NodeId,
@@ -89,7 +89,7 @@ impl SafeKeeperInfo {
format_version: SK_FORMAT_VERSION,
epoch: 0,
server: ServerInfo {
protocol_version: SK_PROTOCOL_VERSION, /* proxy-safekeeper protocol version */
protocol_version: SK_PROTOCOL_VERSION, /* proposer-safekeeper protocol version */
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
node_id: NodeId {
term: 0,
@@ -108,7 +108,7 @@ impl SafeKeeperInfo {
}
}
/// Request with WAL message sent from proxy to safekeeper.
/// Request with WAL message sent from proposer to safekeeper.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct SafeKeeperRequest {
/// Sender's node identifier (looks like we do not need it for TCP streaming connection)
@@ -117,13 +117,13 @@ struct SafeKeeperRequest {
begin_lsn: Lsn,
/// end position of message in WAL
end_lsn: Lsn,
/// restart LSN position (minimal LSN which may be needed by proxy to perform recovery)
/// restart LSN position (minimal LSN which may be needed by proposer to perform recovery)
restart_lsn: Lsn,
/// LSN committed by quorum of safekeepers
commit_lsn: Lsn,
}
/// Report safekeeper state to proxy
/// Report safekeeper state to proposer
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct SafeKeeperResponse {
epoch: u64,
@@ -246,14 +246,14 @@ impl ReceiveWalConn {
my_info.flush_lsn = flush_lsn;
my_info.server.timeline = timeline;
/* Report my identifier to proxy */
/* Report my identifier to proposer */
my_info.ser_into(&mut self.stream_out)?;
/* Wait for vote request */
let prop = self.read_req::<RequestVote>()?;
/* This is Paxos check which should ensure that only one master can perform commits */
if prop.node_id < my_info.server.node_id {
/* Send my node-id to inform proxy that it's candidate was rejected */
/* Send my node-id to inform proposer that it's candidate was rejected */
my_info.server.node_id.ser_into(&mut self.stream_out)?;
bail!(
"Reject connection attempt with term {} because my term is {}",
@@ -269,7 +269,7 @@ impl ReceiveWalConn {
let mut flushed_restart_lsn = Lsn(0);
let wal_seg_size = server_info.wal_seg_size as usize;
/* Acknowledge the proposed candidate by returning it to the proxy */
/* Acknowledge the proposed candidate by returning it to the proposer */
prop.node_id.ser_into(&mut self.stream_out)?;
if self.conf.pageserver_addr.is_some() {
@@ -323,7 +323,7 @@ impl ReceiveWalConn {
/*
* Epoch switch happen when written WAL record cross the boundary.
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
* maximum (vcl) determined by safekeeper_proxy during handshake.
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
*/
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {