mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Reorganize existing READMEs and other documentation files into mdbook format. The resulting Table of Contents is a mix of placeholders for docs that we should write, and documentation files that we already had, dropped into the most appropriate place. Update the Pageserver overview diagram. Add sections on thread management and WAL redo processes. Add all the RFCs to the mdbook Table of Content too. Per github issue #1979
280 lines
12 KiB
Markdown
280 lines
12 KiB
Markdown
# WAL proposer-safekeeper communication consensus protocol.
|
|
|
|
## General requirements and architecture
|
|
|
|
There is single stateless master and several safekeepers. Number of safekeepers is determined by redundancy level.
|
|
To minimize number of changes in Postgres core, we are using standard streaming replication from master (through WAL sender).
|
|
This replication stream is initiated by the WAL proposer process that runs in the PostgreSQL server, which broadcasts the WAL generated by PostgreSQL to safekeepers.
|
|
To provide durability we use synchronous replication at master (response to the commit statement is sent to the client
|
|
only when acknowledged by WAL receiver). WAL proposer sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.
|
|
|
|
WAL proposer tries to establish connections with safekeepers.
|
|
At any moment of time each safekeeper can serve exactly once proposer, but it can accept new connections.
|
|
|
|
Any of safekeepers can be used as WAL server, producing replication stream. So both `Pagers` and `Replicas`
|
|
(read-only computation nodes) can connect to safekeeper to receive WAL stream. Safekeepers is streaming WAL until
|
|
it reaches min(`commitLSN`,`flushLSN`). Then replication is suspended until new data arrives from master.
|
|
|
|
|
|
## Handshake
|
|
The goal of handshake is to collect quorum (to be able to perform recovery)
|
|
and avoid split-brains caused by simultaneous presence of old and new master.
|
|
Procedure of handshake consists of the following steps:
|
|
|
|
1. Broadcast information about server to all safekeepers (wal segment size, system_id,...)
|
|
2. Receive responses with information about safekeepers.
|
|
3. Once quorum of handshake responses are received, propose new `NodeId(max(term)+1, server.uuid)`
|
|
to all of them.
|
|
4. On receiving proposed nodeId, safekeeper compares it with locally stored nodeId and if it is greater or equals
|
|
then accepts proposed nodeId and persists this choice in the local control file.
|
|
5. If quorum of safekeepers approve proposed nodeId, then server assumes that handshake is successfully completed and switch to recovery stage.
|
|
|
|
## Recovery
|
|
Proposer computes max(`restartLSN`) and max(`flushLSN`) from quorum of attached safekeepers.
|
|
`RestartLSN` - is position in WAL which is known to be delivered to all safekeepers.
|
|
In other words: `restartLSN` can be also considered as cut-off horizon (all preceding WAL segments can be removed).
|
|
`FlushLSN` is position flushed by safekeeper to the local persistent storage.
|
|
|
|
If max(`restartLSN`) != max(`flushLSN`), then recovery has to be performed.
|
|
Proposer creates replication channel with most advanced safekeeper (safekeeper with the largest `flushLSN`).
|
|
Then it downloads all WAL messages between max(`restartLSN`)..max(`flushLSN`).
|
|
Messages are inserted in L1-list (ordered by LSN). Then we locate position of each safekeeper in this list according
|
|
to their `flushLSN`s. Safekeepers that are not yet connected (out of quorum) should start from the beginning of the list
|
|
(corresponding to `restartLSN`).
|
|
|
|
We need to choose max(`flushLSN`) because voting quorum may be different from quorum committed the last message.
|
|
So we do not know whether records with max(`flushLSN`) was committed by quorum or not. So we have to consider it committed
|
|
to avoid loose of committed data.
|
|
|
|
Calculated max(`flushLSN`) is called `VCL` (Volume Complete LSN). As far as it is chosen among quorum, there may be some other offline safekeeper with larger
|
|
`VCL`. Once it becomes online, we need to overwrite its WAL beyond `VCL`. To support it, each safekeeper maintains
|
|
`epoch` number. `Epoch` plays almost the same role as `term`, but algorithm of `epoch` bumping is different.
|
|
`VCL` and new epoch are received by safekeeper from proposer during voting.
|
|
But safekeeper doesn't switch to new epoch immediately after voting.
|
|
Instead of it, safekeepers waits record with LSN > Max(`flushLSN`,`VCL`) is received.
|
|
It means that we restore all records from old generation and switch to new generation.
|
|
When proposer calculates max(`FlushLSN`), it first compares `Epoch`. So actually we compare (`Epoch`,`FlushLSN`) pairs.
|
|
|
|
Let's looks at the examples. Consider that we have three safekeepers: S1, S2, S3. Si(N) means that i-th safekeeper has epoch=N.
|
|
Ri(x) - WAL record for resource X with LSN=i. Assume that we have the following state:
|
|
|
|
```
|
|
S1(1): R1(a)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Proposer choose quorum (S1,S2). VCL for them is 2. We download S2 to proposer and schedule its write to S1.
|
|
After receiving record R5 the picture can be:
|
|
|
|
```
|
|
S1(2): R1(a),R2(b),R3(e)
|
|
S2(2): R1(a),R2(b),R3(e)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Now if server is crashed or restarted, we perform new voting and
|
|
doesn't matter which quorum we choose: (S1,S2), (S2,S3)...
|
|
in any case VCL=3, because S3 has smaller epoch.
|
|
R3(c) will be overwritten with R3(e):
|
|
|
|
```
|
|
S1(3): R1(a),R2(b),R3(e)
|
|
S2(3): R1(a),R2(b),R3(e)
|
|
S3(1): R1(a),R2(b),R3(e),R4(d)
|
|
```
|
|
|
|
Epoch of S3 will be adjusted once it overwrites R4:
|
|
|
|
```
|
|
S1(3): R1(a),R2(b),R3(e),R4(f)
|
|
S2(3): R1(a),R2(b),R3(e),R4(f)
|
|
S3(3): R1(a),R2(b),R3(e),R4(f)
|
|
```
|
|
|
|
Crash can happen before epoch was bumped. Let's return back to the initial position:
|
|
|
|
```
|
|
S1(1): R1(a)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
Assume that we start recovery:
|
|
|
|
```
|
|
S1(1): R1(a),R2(b)
|
|
S2(1): R1(a),R2(b)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
|
```
|
|
|
|
and then crash happens. During voting we choose quorum (S3,S3).
|
|
Now them belong to the same epoch and S3 is most advanced among them.
|
|
So VCL is set to 4 and we recover S1 and S2 from S3:
|
|
|
|
```
|
|
S1(1): R1(a),R2(b),R3(c),R4(d)
|
|
S2(1): R1(a),R2(b),R3(c),R4(d)
|
|
S3(1): R1(a),R2(b),R3(c),R4(d)
|
|
```
|
|
|
|
## Main loop
|
|
Once recovery is completed, proposer switches to normal processing loop: it receives WAL stream from Postgres and appends WAL
|
|
messages to the list. At the same time it tries to push messages to safekeepers. Each safekeeper is associated
|
|
with some element in message list and once it acknowledged receiving of the message, position is moved forward.
|
|
Each queue element contains acknowledgment mask, which bits corresponds to safekeepers.
|
|
Once all safekeepers acknowledged receiving of this message (by setting correspondent bit),
|
|
then element can be removed from queue and `restartLSN` is advanced forward.
|
|
|
|
Proposer maintains `restartLSN` and `commitLSN` based on the responses received by safekeepers.
|
|
`RestartLSN` equals to the LSN of head message in the list. `CommitLSN` is `flushLSN[nSafekeepers-Quorum]` element
|
|
in ordered array with `flushLSN`s of safekeepers. `CommitLSN` and `RestartLSN` are included in requests
|
|
sent from proposer to safekeepers and stored in safekeepers control file.
|
|
To avoid overhead of extra fsync, this control file is not fsynced on each request. Flushing this file is performed
|
|
periodically, which means that `restartLSN`/`commitLSN` stored by safekeeper may be slightly deteriorated.
|
|
It is not critical because may only cause redundant processing of some WAL record.
|
|
And `FlushLSN` is recalculated after node restart by scanning local WAL files.
|
|
|
|
## Fault tolerance
|
|
If the WAL proposer process looses connection to safekeeper it tries to reestablish this connection using the same nodeId.
|
|
|
|
Restart of PostgreSQL initiates new round of voting and switching new epoch.
|
|
|
|
## Limitations
|
|
Right now message queue is maintained in main memory and is not spilled to the disk.
|
|
It can cause memory overflow in case of presence of lagging safekeepers.
|
|
It is assumed that in case of losing local data by some safekeepers, it should be recovered using some external mechanism.
|
|
|
|
|
|
## Glossary
|
|
* `CommitLSN`: position in WAL confirmed by quorum safekeepers.
|
|
* `RestartLSN`: position in WAL confirmed by all safekeepers.
|
|
* `FlushLSN`: part of WAL persisted to the disk by safekeeper.
|
|
* `NodeID`: pair (term,UUID)
|
|
* `Pager`: Neon component restoring pages from WAL stream
|
|
* `Replica`: read-only computation node
|
|
* `VCL`: the largest LSN for which we can guarantee availability of all prior records.
|
|
|
|
## Algorithm
|
|
|
|
```python
|
|
process WalProposer(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
|
|
function do_recovery(epoch,restart_lsn,VCL)
|
|
leader = i:safekeepers[i].state.epoch=epoch and safekeepers[i].state.flushLsn=VCL
|
|
wal_stream = safekeepers[leader].start_replication(restart_lsn,VCL)
|
|
do
|
|
message = wal_stream.read()
|
|
message_queue.append(message)
|
|
while message.startPos < VCL
|
|
|
|
for i in 1..safekeepers.size()
|
|
for message in message_queue
|
|
if message.endLsn < safekeepers[i].state.flushLsn
|
|
message.delivered += i
|
|
else
|
|
send_message(i, message)
|
|
break
|
|
end function
|
|
|
|
function send_message(i,msg)
|
|
msg.restartLsn = restart_lsn
|
|
msg.commitLsn = get_commit_lsn()
|
|
safekeepers[i].send(msg, response_handler)
|
|
end function
|
|
|
|
function do_broadcast(message)
|
|
for i in 1..safekeepers.size()
|
|
if not safekeepers[i].sending()
|
|
send_message(i, message)
|
|
end function
|
|
|
|
function get_commit_lsn()
|
|
sorted_feedbacks = feedbacks.sort()
|
|
return sorted_feedbacks[safekeepers.size() - quorum]
|
|
end function
|
|
|
|
function response_handler(i,message,response)
|
|
feedbacks[i] = if response.epoch=curr_epoch then response.flushLsn else VCL
|
|
server.write(get_commit_lsn())
|
|
|
|
message.delivered += i
|
|
next_message = message_queue.next(message)
|
|
if next_message
|
|
send_message(i, next_message)
|
|
|
|
while message_queue.head.delivered.size() = safekeepers.size()
|
|
if restart_lsn < message_queue.head.beginLsn
|
|
restart_lsn = message_queue.head.endLsn
|
|
message_queue.pop_head()
|
|
end function
|
|
|
|
server_info = server.read()
|
|
|
|
safekeepers.write(server_info)
|
|
safekeepers.state = safekeepers.read()
|
|
next_term = max(safekeepers.state.nodeId.term)+1
|
|
restart_lsn = max(safekeepers.state.restartLsn)
|
|
epoch,VCL = max(safekeepers.state.epoch,safekeepers.state.flushLsn)
|
|
curr_epoch = epoch + 1
|
|
|
|
proposal = Proposal(NodeId(next_term,server.id),curr_epoch,VCL)
|
|
safekeepers.send(proposal)
|
|
responses = safekeepers.read()
|
|
if any responses.is_rejected()
|
|
exit()
|
|
|
|
for i in 1..safekeepers.size()
|
|
feedbacks[i].flushLsn = if epoch=safekeepers[i].state.epoch then safekeepers[i].state.flushLsn else restart_lsn
|
|
|
|
if restart_lsn != VCL
|
|
do_recovery(epoch,restart_lsn,VCL)
|
|
|
|
wal_stream = server.start_replication(VCL)
|
|
for ever
|
|
message = wal_stream.read()
|
|
message_queue.append(message)
|
|
do_broadcast(message)
|
|
end process
|
|
|
|
process safekeeper(gateway,state)
|
|
function handshake()
|
|
proposer = gateway.accept()
|
|
server_info = proposer.read()
|
|
proposer.write(state)
|
|
proposal = proposer.read()
|
|
if proposal.nodeId < state.nodeId
|
|
proposer.write(rejected)
|
|
return null
|
|
else
|
|
state.nodeId = proposal.nodeId
|
|
state.proposed_epoch = proposal.epoch
|
|
state.VCL = proposal.VCL
|
|
write_control_file(state)
|
|
proposer.write(accepted)
|
|
return proposer
|
|
end function
|
|
|
|
state = read_control_file()
|
|
state.flushLsn = locate_end_of_wal()
|
|
|
|
for ever
|
|
proposer = handshake()
|
|
if not proposer
|
|
continue
|
|
for ever
|
|
req = proposer.read()
|
|
if req.nodeId != state.nodeId
|
|
break
|
|
save_wal_file(req.data)
|
|
state.restartLsn = req.restartLsn
|
|
if state.epoch < state.proposed_epoch and req.endPos > max(state.flushLsn,state.VCL)
|
|
state.epoch = state.proposed_epoch
|
|
if req.endPos > state.flushLsn
|
|
state.flushLsn = req.endPos
|
|
save_control_file(state)
|
|
resp = Response(state.epoch,req.endPos)
|
|
proposer.write(resp)
|
|
notify_wal_sender(Min(req.commitLsn,req.endPos))
|
|
end process
|
|
```
|