mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
walkeeper -> safekeeper
This commit is contained in:
committed by
Kirill Bulatov
parent
81879f8137
commit
81417788c8
279
safekeeper/README_PROTO.md
Normal file
279
safekeeper/README_PROTO.md
Normal file
@@ -0,0 +1,279 @@
|
||||
# WAL proposer-safekeeper communication consensus protocol.
|
||||
|
||||
## General requirements and architecture
|
||||
|
||||
There is single stateless master and several safekeepers. Number of safekeepers is determined by redundancy level.
|
||||
To minimize number of changes in Postgres core, we are using standard streaming replication from master (through WAL sender).
|
||||
This replication stream is initiated by the WAL proposer process that runs in the PostgreSQL server, which broadcasts the WAL generated by PostgreSQL to safekeepers.
|
||||
To provide durability we use synchronous replication at master (response to the commit statement is sent to the client
|
||||
only when acknowledged by WAL receiver). WAL proposer sends this acknowledgment only when LSN of commit record is confirmed by quorum of safekeepers.
|
||||
|
||||
WAL proposer tries to establish connections with safekeepers.
|
||||
At any moment of time each safekeeper can serve exactly once proposer, but it can accept new connections.
|
||||
|
||||
Any of safekeepers can be used as WAL server, producing replication stream. So both `Pagers` and `Replicas`
|
||||
(read-only computation nodes) can connect to safekeeper to receive WAL stream. Safekeepers is streaming WAL until
|
||||
it reaches min(`commitLSN`,`flushLSN`). Then replication is suspended until new data arrives from master.
|
||||
|
||||
|
||||
## Handshake
|
||||
The goal of handshake is to collect quorum (to be able to perform recovery)
|
||||
and avoid split-brains caused by simultaneous presence of old and new master.
|
||||
Procedure of handshake consists of the following steps:
|
||||
|
||||
1. Broadcast information about server to all safekeepers (wal segment size, system_id,...)
|
||||
2. Receive responses with information about safekeepers.
|
||||
3. Once quorum of handshake responses are received, propose new `NodeId(max(term)+1, server.uuid)`
|
||||
to all of them.
|
||||
4. On receiving proposed nodeId, safekeeper compares it with locally stored nodeId and if it is greater or equals
|
||||
then accepts proposed nodeId and persists this choice in the local control file.
|
||||
5. If quorum of safekeepers approve proposed nodeId, then server assumes that handshake is successfully completed and switch to recovery stage.
|
||||
|
||||
## Recovery
|
||||
Proposer computes max(`restartLSN`) and max(`flushLSN`) from quorum of attached safekeepers.
|
||||
`RestartLSN` - is position in WAL which is known to be delivered to all safekeepers.
|
||||
In other words: `restartLSN` can be also considered as cut-off horizon (all preceding WAL segments can be removed).
|
||||
`FlushLSN` is position flushed by safekeeper to the local persistent storage.
|
||||
|
||||
If max(`restartLSN`) != max(`flushLSN`), then recovery has to be performed.
|
||||
Proposer creates replication channel with most advanced safekeeper (safekeeper with the largest `flushLSN`).
|
||||
Then it downloads all WAL messages between max(`restartLSN`)..max(`flushLSN`).
|
||||
Messages are inserted in L1-list (ordered by LSN). Then we locate position of each safekeeper in this list according
|
||||
to their `flushLSN`s. Safekeepers that are not yet connected (out of quorum) should start from the beginning of the list
|
||||
(corresponding to `restartLSN`).
|
||||
|
||||
We need to choose max(`flushLSN`) because voting quorum may be different from quorum committed the last message.
|
||||
So we do not know whether records with max(`flushLSN`) was committed by quorum or not. So we have to consider it committed
|
||||
to avoid loose of committed data.
|
||||
|
||||
Calculated max(`flushLSN`) is called `VCL` (Volume Complete LSN). As far as it is chosen among quorum, there may be some other offline safekeeper with larger
|
||||
`VCL`. Once it becomes online, we need to overwrite its WAL beyond `VCL`. To support it, each safekeeper maintains
|
||||
`epoch` number. `Epoch` plays almost the same role as `term`, but algorithm of `epoch` bumping is different.
|
||||
`VCL` and new epoch are received by safekeeper from proposer during voting.
|
||||
But safekeeper doesn't switch to new epoch immediately after voting.
|
||||
Instead of it, safekeepers waits record with LSN > Max(`flushLSN`,`VCL`) is received.
|
||||
It means that we restore all records from old generation and switch to new generation.
|
||||
When proposer calculates max(`FlushLSN`), it first compares `Epoch`. So actually we compare (`Epoch`,`FlushLSN`) pairs.
|
||||
|
||||
Let's looks at the examples. Consider that we have three safekeepers: S1, S2, S3. Si(N) means that i-th safekeeper has epoch=N.
|
||||
Ri(x) - WAL record for resource X with LSN=i. Assume that we have the following state:
|
||||
|
||||
```
|
||||
S1(1): R1(a)
|
||||
S2(1): R1(a),R2(b)
|
||||
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
||||
```
|
||||
|
||||
Proposer choose quorum (S1,S2). VCL for them is 2. We download S2 to proposer and schedule its write to S1.
|
||||
After receiving record R5 the picture can be:
|
||||
|
||||
```
|
||||
S1(2): R1(a),R2(b),R3(e)
|
||||
S2(2): R1(a),R2(b),R3(e)
|
||||
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
||||
```
|
||||
|
||||
Now if server is crashed or restarted, we perform new voting and
|
||||
doesn't matter which quorum we choose: (S1,S2), (S2,S3)...
|
||||
in any case VCL=3, because S3 has smaller epoch.
|
||||
R3(c) will be overwritten with R3(e):
|
||||
|
||||
```
|
||||
S1(3): R1(a),R2(b),R3(e)
|
||||
S2(3): R1(a),R2(b),R3(e)
|
||||
S3(1): R1(a),R2(b),R3(e),R4(d)
|
||||
```
|
||||
|
||||
Epoch of S3 will be adjusted once it overwrites R4:
|
||||
|
||||
```
|
||||
S1(3): R1(a),R2(b),R3(e),R4(f)
|
||||
S2(3): R1(a),R2(b),R3(e),R4(f)
|
||||
S3(3): R1(a),R2(b),R3(e),R4(f)
|
||||
```
|
||||
|
||||
Crash can happen before epoch was bumped. Let's return back to the initial position:
|
||||
|
||||
```
|
||||
S1(1): R1(a)
|
||||
S2(1): R1(a),R2(b)
|
||||
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
||||
```
|
||||
|
||||
Assume that we start recovery:
|
||||
|
||||
```
|
||||
S1(1): R1(a),R2(b)
|
||||
S2(1): R1(a),R2(b)
|
||||
S3(1): R1(a),R2(b),R3(c),R4(d) - offline
|
||||
```
|
||||
|
||||
and then crash happens. During voting we choose quorum (S3,S3).
|
||||
Now them belong to the same epoch and S3 is most advanced among them.
|
||||
So VCL is set to 4 and we recover S1 and S2 from S3:
|
||||
|
||||
```
|
||||
S1(1): R1(a),R2(b),R3(c),R4(d)
|
||||
S2(1): R1(a),R2(b),R3(c),R4(d)
|
||||
S3(1): R1(a),R2(b),R3(c),R4(d)
|
||||
```
|
||||
|
||||
## Main loop
|
||||
Once recovery is completed, proposer switches to normal processing loop: it receives WAL stream from Postgres and appends WAL
|
||||
messages to the list. At the same time it tries to push messages to safekeepers. Each safekeeper is associated
|
||||
with some element in message list and once it acknowledged receiving of the message, position is moved forward.
|
||||
Each queue element contains acknowledgment mask, which bits corresponds to safekeepers.
|
||||
Once all safekeepers acknowledged receiving of this message (by setting correspondent bit),
|
||||
then element can be removed from queue and `restartLSN` is advanced forward.
|
||||
|
||||
Proposer maintains `restartLSN` and `commitLSN` based on the responses received by safekeepers.
|
||||
`RestartLSN` equals to the LSN of head message in the list. `CommitLSN` is `flushLSN[nSafekeepers-Quorum]` element
|
||||
in ordered array with `flushLSN`s of safekeepers. `CommitLSN` and `RestartLSN` are included in requests
|
||||
sent from proposer to safekeepers and stored in safekeepers control file.
|
||||
To avoid overhead of extra fsync, this control file is not fsynced on each request. Flushing this file is performed
|
||||
periodically, which means that `restartLSN`/`commitLSN` stored by safekeeper may be slightly deteriorated.
|
||||
It is not critical because may only cause redundant processing of some WAL record.
|
||||
And `FlushLSN` is recalculated after node restart by scanning local WAL files.
|
||||
|
||||
## Fault tolerance
|
||||
If the WAL proposer process looses connection to safekeeper it tries to reestablish this connection using the same nodeId.
|
||||
|
||||
Restart of PostgreSQL initiates new round of voting and switching new epoch.
|
||||
|
||||
## Limitations
|
||||
Right now message queue is maintained in main memory and is not spilled to the disk.
|
||||
It can cause memory overflow in case of presence of lagging safekeepers.
|
||||
It is assumed that in case of loosing local data by some safekeepers, it should be recovered using some external mechanism.
|
||||
|
||||
|
||||
## Glossary
|
||||
* `CommitLSN`: position in WAL confirmed by quorum safekeepers.
|
||||
* `RestartLSN`: position in WAL confirmed by all safekeepers.
|
||||
* `FlushLSN`: part of WAL persisted to the disk by safekeeper.
|
||||
* `NodeID`: pair (term,UUID)
|
||||
* `Pager`: Zenith component restoring pages from WAL stream
|
||||
* `Replica`: read-only computatio node
|
||||
* `VCL`: the largerst LSN for which we can guarantee availablity of all prior records.
|
||||
|
||||
## Algorithm
|
||||
|
||||
```python
|
||||
process WalProposer(safekeepers,server,curr_epoch,restart_lsn=0,message_queue={},feedbacks={})
|
||||
function do_recovery(epoch,restart_lsn,VCL)
|
||||
leader = i:safekeepers[i].state.epoch=epoch and safekeepers[i].state.flushLsn=VCL
|
||||
wal_stream = safekeepers[leader].start_replication(restart_lsn,VCL)
|
||||
do
|
||||
message = wal_stream.read()
|
||||
message_queue.append(message)
|
||||
while message.startPos < VCL
|
||||
|
||||
for i in 1..safekeepers.size()
|
||||
for message in message_queue
|
||||
if message.endLsn < safekeepers[i].state.flushLsn
|
||||
message.delivered += i
|
||||
else
|
||||
send_message(i, message)
|
||||
break
|
||||
end function
|
||||
|
||||
function send_message(i,msg)
|
||||
msg.restartLsn = restart_lsn
|
||||
msg.commitLsn = get_commit_lsn()
|
||||
safekeepers[i].send(msg, response_handler)
|
||||
end function
|
||||
|
||||
function do_broadcast(message)
|
||||
for i in 1..safekeepers.size()
|
||||
if not safekeepers[i].sending()
|
||||
send_message(i, message)
|
||||
end function
|
||||
|
||||
function get_commit_lsn()
|
||||
sorted_feedbacks = feedbacks.sort()
|
||||
return sorted_feedbacks[safekeepers.size() - quorum]
|
||||
end function
|
||||
|
||||
function response_handler(i,message,response)
|
||||
feedbacks[i] = if response.epoch=curr_epoch then response.flushLsn else VCL
|
||||
server.write(get_commit_lsn())
|
||||
|
||||
message.delivered += i
|
||||
next_message = message_queue.next(message)
|
||||
if next_message
|
||||
send_message(i, next_message)
|
||||
|
||||
while message_queue.head.delivered.size() = safekeepers.size()
|
||||
if restart_lsn < message_queue.head.beginLsn
|
||||
restart_lsn = message_queue.head.endLsn
|
||||
message_queue.pop_head()
|
||||
end function
|
||||
|
||||
server_info = server.read()
|
||||
|
||||
safekeepers.write(server_info)
|
||||
safekeepers.state = safekeepers.read()
|
||||
next_term = max(safekeepers.state.nodeId.term)+1
|
||||
restart_lsn = max(safekeepers.state.restartLsn)
|
||||
epoch,VCL = max(safekeepers.state.epoch,safekeepers.state.flushLsn)
|
||||
curr_epoch = epoch + 1
|
||||
|
||||
proposal = Proposal(NodeId(next_term,server.id),curr_epoch,VCL)
|
||||
safekeepers.send(proposal)
|
||||
responses = safekeepers.read()
|
||||
if any responses.is_rejected()
|
||||
exit()
|
||||
|
||||
for i in 1..safekeepers.size()
|
||||
feedbacks[i].flushLsn = if epoch=safekeepers[i].state.epoch then safekeepers[i].state.flushLsn else restart_lsn
|
||||
|
||||
if restart_lsn != VCL
|
||||
do_recovery(epoch,restart_lsn,VCL)
|
||||
|
||||
wal_stream = server.start_replication(VCL)
|
||||
for ever
|
||||
message = wal_stream.read()
|
||||
message_queue.append(message)
|
||||
do_broadcast(message)
|
||||
end process
|
||||
|
||||
process safekeeper(gateway,state)
|
||||
function handshake()
|
||||
proposer = gateway.accept()
|
||||
server_info = proposer.read()
|
||||
proposer.write(state)
|
||||
proposal = proposer.read()
|
||||
if proposal.nodeId < state.nodeId
|
||||
proposer.write(rejected)
|
||||
return null
|
||||
else
|
||||
state.nodeId = proposal.nodeId
|
||||
state.proposed_epoch = proposal.epoch
|
||||
state.VCL = proposal.VCL
|
||||
write_control_file(state)
|
||||
proposer.write(accepted)
|
||||
return proposer
|
||||
end function
|
||||
|
||||
state = read_control_file()
|
||||
state.flushLsn = locate_end_of_wal()
|
||||
|
||||
for ever
|
||||
proposer = handshake()
|
||||
if not proposer
|
||||
continue
|
||||
for ever
|
||||
req = proposer.read()
|
||||
if req.nodeId != state.nodeId
|
||||
break
|
||||
save_wal_file(req.data)
|
||||
state.restartLsn = req.restartLsn
|
||||
if state.epoch < state.proposed_epoch and req.endPos > max(state.flushLsn,state.VCL)
|
||||
state.epoch = state.proposed_epoch
|
||||
if req.endPos > state.flushLsn
|
||||
state.flushLsn = req.endPos
|
||||
save_control_file(state)
|
||||
resp = Response(state.epoch,req.endPos)
|
||||
proposer.write(resp)
|
||||
notify_wal_sender(Min(req.commitLsn,req.endPos))
|
||||
end process
|
||||
```
|
||||
Reference in New Issue
Block a user