mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
280 lines
12 KiB
Markdown
280 lines
12 KiB
Markdown
# WAL proposer-safekeeper communication consensus protocol.
|
|
|
|
## General requirements and architecture
|
|
|
|
There is single stateless master and several safekeepers. Number of safekeepers is determined by redundancy level.
|
|
To minimize number of changes in Postgres core, we are using standard streaming replication from master (through WAL sender).
|
|
This replication stream is initiated by the WAL proposer process that runs in the PostgreSQL server, which broadcasts the WAL generated by PostgreSQL to safekeepers.
|
|
To provide durability we use synchronous replication at master (response to the commit statement is sent to the client
|
|
only when acknowledged by WAL receiver). WAL proposer sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.
|
|
|
|
WAL proposer tries to establish connections with safekeepers.
|
|
At any moment of time each safekeeper can serve exactly once proposer, but it can accept new connections.
|
|
|
|
Any of safekeepers can be used as WAL server, producing replication stream. So both `Pagers` and `Replicas`
|
|
(read-only computation nodes) can connect to safekeeper to receive WAL stream. Safekeepers is streaming WAL until
|
|
it reaches min(`commitLSN`,`flushLSN`). Then replication is suspended until new data arrives from master.
|
|
|
|
|
|
## Handshake
|
|
The goal of handshake is to collect quorum (to be able to perform recovery)
|
|
and avoid split-brains caused by simultaneous presence of old and new master.
|
|
Procedure of handshake consists of the following steps:
|
|
|
|
1. Broadcast information about server to all safekeepers (wal segment size, system_id,...)
|
|
2. Receive responses with information about safekeepers.
|
|
3. Once quorum of handshake responses are received, propose new `NodeId(max(term)+1, server.uuid)`
|
|
to all of them.
|
|
4. On receiving proposed nodeId, safekeeper compares it with locally stored nodeId and if it is greater or equals
|
|
then accepts proposed nodeId and persists this choice in the local control file.
|
|
5. If quorum of safekeepers approve proposed nodeId, then server assumes that handshake is successfully completed and switch to recovery stage.
|
|
|
|
## Recovery
|
|
Proposer computes max(`restartLSN`) and max(`flushLSN`) from quorum of attached safekeepers.
|
|
`RestartLSN` - is position in WAL which is known to be delivered to all safekeepers.
|
|
In other words: `restartLSN` can be also considered as cut-off horizon (all preceding WAL segments can be removed).
|
|
`FlushLSN` is position flushed by safekeeper to the local persistent storage.
|
|
|
|
If max(`restartLSN`) != max(`flushLSN`), then recovery has to be performed.
|
|
Proposer creates replication channel with most advanced safekeeper (safekeeper with the largest `flushLSN`).
|
|
Then it downloads all WAL messages between max(`restartLSN`)..max(`flushLSN`).
|
|
Messages are inserted in L1-list (ordered by LSN). Then we locate position of each safekeeper in this list according
|
|
to their `flushLSN`s. Safekeepers that are not yet connected (out of quorum) should start from the beginning of the list
|
|
(corresponding to `restartLSN`).
|
|
|
|
We need to choose max(`flushLSN`) because voting quorum may be different from quorum committed the last message.
|
|
So we do not know whether records with max(`flushLSN`) was committed by quorum or not. So we have to consider it committed
|
|
to avoid loose of committed data.
|
|
|
|
Calculated max(`flushLSN`) is called `VCL` (Volume Complete LSN). As far as it is chosen among quorum, there may be some other offline safekeeper with larger
|
|
`VCL`. Once it becomes online, we need to overwrite its WAL beyond `VCL`. To support it, each safekeeper maintains
|
|
`epoch` number. `Epoch` plays almost the same role as `term`, but algorithm of `epoch` bumping is different.
|
|
`VCL` and new epoch are received by safekeeper from proposer during voting.
|
|
But safekeeper doesn't switch to new epoch immediately after voting.
|
|
Instead of it, safekeepers waits record with LSN > Max(`flushLSN`,`VCL`) is received.
|
|
It means that we restore all records from old generation and switch to new generation.
|
|
When proposer calculates max(`FlushLSN`), it first compares `Epoch`. So actually we compare (`Epoch`,`FlushLSN`) pairs.
|
|
|
|
Let's looks at the examples. Consider that we have three safekeepers: S1, S2, S3. Si(N) means that i-th safekeeper has epoch=N.
|
|
Ri(x) - WAL record for resource X with LSN=i. Assume that we have the following state:
|
|
|
|
```
|
|
S1(1): R1(a)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Proposer choose quorum (S1,S2). VCL for them is 2. We download S2 to proposer and schedule its write to S1.
|
|
After receiving record R5 the picture can be:
|
|
|
|
```
|
|
S1(2): R1(a),R2(b),R3(e)
|
|
S2(2): R1(a),R2(b),R3(e)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Now if server is crashed or restarted, we perform new voting and
|
|
doesn't matter which quorum we choose: (S1,S2), (S2,S3)...
|
|
in any case VCL=3, because S3 has smaller epoch.
|
|
R3(c) will be overwritten with R3(e):
|
|
|
|
```
|
|
S1(3): R1(a),R2(b),R3(e)
|
|
S2(3): R1(a),R2(b),R3(e)
|
|
S3(1): R1(a),R2(b),R3(e),R4(d)
|
|
```
|
|
|
|
Epoch of S3 will be adjusted once it overwrites R4:
|
|
|
|
```
|
|
S1(3): R1(a),R2(b),R3(e),R4(f)
|
|
S2(3): R1(a),R2(b),R3(e),R4(f)
|
|
S3(3): R1(a),R2(b),R3(e),R4(f)
|
|
```
|
|
|
|
Crash can happen before epoch was bumped. Let's return back to the initial position:
|
|
|
|
```
|
|
S1(1): R1(a)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Assume that we start recovery:
|
|
|
|
```
|
|
S1(1): R1(a),R2(b)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
and then crash happens. During voting we choose quorum (S3,S3).
|
|
Now them belong to the same epoch and S3 is most advanced among them.
|
|
So VCL is set to 4 and we recover S1 and S2 from S3:
|
|
|
|
```
|
|
S1(1): R1(a),R2(b),R3(c),R4(d)
|
|
S2(1): R1(a),R2(b),R3(c),R4(d)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d)
|
|
```
|
|
|
|
## Main loop
|
|
Once recovery is completed, proposer switches to normal processing loop: it receives WAL stream from Postgres and appends WAL
|
|
messages to the list. At the same time it tries to push messages to safekeepers. Each safekeeper is associated
|
|
with some element in message list and once it acknowledged receiving of the message, position is moved forward.
|
|
Each queue element contains acknowledgment mask, which bits corresponds to safekeepers.
|
|
Once all safekeepers acknowledged receiving of this message (by setting correspondent bit),
|
|
then element can be removed from queue and `restartLSN` is advanced forward.
|
|
|
|
Proposer maintains `restartLSN` and `commitLSN` based on the responses received by safekeepers.
|
|
`RestartLSN` equals to the LSN of head message in the list. `CommitLSN` is `flushLSN[nSafekeepers-Quorum]` element
|
|
in ordered array with `flushLSN`s of safekeepers. `CommitLSN` and `RestartLSN` are included in requests
|
|
sent from proposer to safekeepers and stored in safekeepers control file.
|
|
To avoid overhead of extra fsync, this control file is not fsynced on each request. Flushing this file is performed
|
|
periodically, which means that `restartLSN`/`commitLSN` stored by safekeeper may be slightly deteriorated.
|
|
It is not critical because may only cause redundant processing of some WAL record.
|
|
And `FlushLSN` is recalculated after node restart by scanning local WAL files.
|
|
|
|
## Fault tolerance
|
|
If the WAL proposer process looses connection to safekeeper it tries to reestablish this connection using the same nodeId.
|
|
|
|
Restart of PostgreSQL initiates new round of voting and switching new epoch.
|
|
|
|
## Limitations
|
|
Right now message queue is maintained in main memory and is not spilled to the disk.
|
|
It can cause memory overflow in case of presence of lagging safekeepers.
|
|
It is assumed that in case of loosing local data by some safekeepers, it should be recovered using some external mechanism.
|
|
|
|
|
|
## Glossary
|
|
* `CommitLSN`: position in WAL confirmed by quorum safekeepers.
|
|
* `RestartLSN`: position in WAL confirmed by all safekeepers.
|
|
* `FlushLSN`: part of WAL persisted to the disk by safekeeper.
|
|
* `NodeID`: pair (term,UUID)
|
|
* `Pager`: Zenith component restoring pages from WAL stream
|
|
* `Replica`: read-only computatio node
|
|
* `VCL`: the largerst LSN for which we can guarantee availablity of all prior records.
|
|
|
|
## Algorithm
|
|
|
|
```python
|
|
process WalProposer(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
|
|
function do_recovery(epoch,restart_lsn,VCL)
|
|
leader = i:safekeepers[i].state.epoch=epoch and safekeepers[i].state.flushLsn=VCL
|
|
wal_stream = safekeepers[leader].start_replication(restart_lsn,VCL)
|
|
do
|
|
message = wal_stream.read()
|
|
message_queue.append(message)
|
|
while message.startPos < VCL
|
|
|
|
for i in 1..safekeepers.size()
|
|
for message in message_queue
|
|
if message.endLsn < safekeepers[i].state.flushLsn
|
|
message.delivered += i
|
|
else
|
|
send_message(i, message)
|
|
break
|
|
end function
|
|
|
|
function send_message(i,msg)
|
|
msg.restartLsn = restart_lsn
|
|
msg.commitLsn = get_commit_lsn()
|
|
safekeepers[i].send(msg, response_handler)
|
|
end function
|
|
|
|
function do_broadcast(message)
|
|
for i in 1..safekeepers.size()
|
|
if not safekeepers[i].sending()
|
|
send_message(i, message)
|
|
end function
|
|
|
|
function get_commit_lsn()
|
|
sorted_feedbacks = feedbacks.sort()
|
|
return sorted_feedbacks[safekeepers.size() - quorum]
|
|
end function
|
|
|
|
function response_handler(i,message,response)
|
|
feedbacks[i] = if response.epoch=curr_epoch then response.flushLsn else VCL
|
|
server.write(get_commit_lsn())
|
|
|
|
message.delivered += i
|
|
next_message = message_queue.next(message)
|
|
if next_message
|
|
send_message(i, next_message)
|
|
|
|
while message_queue.head.delivered.size() = safekeepers.size()
|
|
if restart_lsn < message_queue.head.beginLsn
|
|
restart_lsn = message_queue.head.endLsn
|
|
message_queue.pop_head()
|
|
end function
|
|
|
|
server_info = server.read()
|
|
|
|
safekeepers.write(server_info)
|
|
safekeepers.state = safekeepers.read()
|
|
next_term = max(safekeepers.state.nodeId.term)+1
|
|
restart_lsn = max(safekeepers.state.restartLsn)
|
|
epoch,VCL = max(safekeepers.state.epoch,safekeepers.state.flushLsn)
|
|
curr_epoch = epoch + 1
|
|
|
|
proposal = Proposal(NodeId(next_term,server.id),curr_epoch,VCL)
|
|
safekeepers.send(proposal)
|
|
responses = safekeepers.read()
|
|
if any responses.is_rejected()
|
|
exit()
|
|
|
|
for i in 1..safekeepers.size()
|
|
feedbacks[i].flushLsn = if epoch=safekeepers[i].state.epoch then safekeepers[i].state.flushLsn else restart_lsn
|
|
|
|
if restart_lsn != VCL
|
|
do_recovery(epoch,restart_lsn,VCL)
|
|
|
|
wal_stream = server.start_replication(VCL)
|
|
for ever
|
|
message = wal_stream.read()
|
|
message_queue.append(message)
|
|
do_broadcast(message)
|
|
end process
|
|
|
|
process safekeeper(gateway,state)
|
|
function handshake()
|
|
proposer = gateway.accept()
|
|
server_info = proposer.read()
|
|
proposer.write(state)
|
|
proposal = proposer.read()
|
|
if proposal.nodeId < state.nodeId
|
|
proposer.write(rejected)
|
|
return null
|
|
else
|
|
state.nodeId = proposal.nodeId
|
|
state.proposed_epoch = proposal.epoch
|
|
state.VCL = proposal.VCL
|
|
write_control_file(state)
|
|
proposer.write(accepted)
|
|
return proposer
|
|
end function
|
|
|
|
state = read_control_file()
|
|
state.flushLsn = locate_end_of_wal()
|
|
|
|
for ever
|
|
proposer = handshake()
|
|
if not proposer
|
|
continue
|
|
for ever
|
|
req = proposer.read()
|
|
if req.nodeId != state.nodeId
|
|
break
|
|
save_wal_file(req.data)
|
|
state.restartLsn = req.restartLsn
|
|
if state.epoch < state.proposed_epoch and req.endPos > max(state.flushLsn,state.VCL)
|
|
state.epoch = state.proposed_epoch
|
|
if req.endPos > state.flushLsn
|
|
state.flushLsn = req.endPos
|
|
save_control_file(state)
|
|
resp = Response(state.epoch,req.endPos)
|
|
proposer.write(resp)
|
|
notify_wal_sender(Min(req.commitLsn,req.endPos))
|
|
end process
|
|
```
|