Request disk consistent LSN through safekeeper

This commit is contained in:
Konstantin Knizhnik
2021-09-21 15:53:50 +03:00
parent ae2232641d
commit bf5d17cbaa
5 changed files with 211 additions and 136 deletions

View File

@@ -1175,81 +1175,71 @@ impl LayeredTimeline {
);
}
loop {
// Do we have a layer open for writing already?
if let Some(layer) = layers.get_open(&seg) {
if layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
return Ok(layer);
// Do we have a layer open for writing already?
if let Some(layer) = layers.get_open(&seg) {
if layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
// No (writeable) layer for this relation yet. Create one.
//
// Is this a completely new relation? Or the first modification after branching?
//
let layer;
if let Some((prev_layer, _prev_lsn)) =
self.get_layer_for_read_locked(seg, lsn, &layers)?
{
// Create new entry after the previous one.
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn;
trace!(
"creating file for write for {} at branch point {}/{}",
seg,
self.timelineid,
start_lsn
);
} else {
start_lsn = prev_layer.get_end_lsn();
trace!(
"creating file for write for {} after previous layer {}/{}",
seg,
self.timelineid,
start_lsn
);
}
trace!(
"prev layer is at {}/{} - {}",
prev_layer.get_timeline_id(),
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::create_successor_layer(
self.conf,
prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
lsn,
)?;
} else {
// New relation.
trace!(
"creating layer for write for new rel {} at {}/{}",
seg,
self.timelineid,
lsn
);
layer = InMemoryLayer::create(
self.conf,
self.timelineid,
self.tenantid,
seg,
lsn,
lsn,
)?;
}
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
layers.insert_open(Arc::clone(&layer_rc));
return Ok(layer_rc);
return Ok(layer);
}
// No (writeable) layer for this relation yet. Create one.
//
// Is this a completely new relation? Or the first modification after branching?
//
let layer;
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? {
// Create new entry after the previous one.
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn;
trace!(
"creating file for write for {} at branch point {}/{}",
seg,
self.timelineid,
start_lsn
);
} else {
start_lsn = prev_layer.get_end_lsn();
trace!(
"creating file for write for {} after previous layer {}/{}",
seg,
self.timelineid,
start_lsn
);
}
trace!(
"prev layer is at {}/{} - {}",
prev_layer.get_timeline_id(),
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::create_successor_layer(
self.conf,
prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
lsn,
)?;
} else {
// New relation.
trace!(
"creating layer for write for new rel {} at {}/{}",
seg,
self.timelineid,
lsn
);
layer =
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
}
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
layers.insert_open(Arc::clone(&layer_rc));
return Ok(layer_rc);
}
///

View File

@@ -2,7 +2,7 @@
//! with the "START_REPLICATION" message.
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use log::*;
@@ -20,7 +20,7 @@ use std::{str, thread};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody};
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
use zenith_utils::sock_split::ReadStream;
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
@@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,
@@ -49,6 +49,16 @@ impl HotStandbyFeedback {
}
}
/// Standby status update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // disk consistent lSN
pub flush_lsn: Lsn, // LSN committedby quorum
pub apply_lsn: Lsn, // not used
pub reply_ts: TimestampTz,
pub reply_requested: bool,
}
/// A network connection that's speaking the replication protocol.
pub struct ReplicationConn {
/// This is an `Option` because we will spawn a background thread that will
@@ -56,16 +66,15 @@ pub struct ReplicationConn {
stream_in: Option<ReadStream>,
}
// TODO: move this to crate::timeline when there's more users
// TODO: design a proper Timeline mock api
trait HsFeedbackSubscriber {
fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {}
/// Scope guard to unregister replication connection from timeline
struct ReplicationConnGuard {
replica: usize, // replica internal ID assigned by timeline
timeline: Arc<Timeline>,
}
impl HsFeedbackSubscriber for Arc<Timeline> {
#[inline(always)]
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
Timeline::add_hs_feedback(self, feedback);
impl Drop for ReplicationConnGuard {
fn drop(&mut self) {
self.timeline.update_replica_state(self.replica, None);
}
}
@@ -79,10 +88,13 @@ impl ReplicationConn {
/// Handle incoming messages from the network.
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(
mut stream_in: impl Read,
subscriber: impl HsFeedbackSubscriber,
) -> Result<()> {
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
let mut state = ReplicaState::new();
let replica = timeline.add_replica();
let _guard = ReplicationConnGuard {
replica,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg {
@@ -94,11 +106,16 @@ impl ReplicationConn {
match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte.
let feedback = HotStandbyFeedback::des(&m[1..])
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
subscriber.add_hs_feedback(feedback);
timeline.update_replica_state(replica, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.write_lsn;
timeline.update_replica_state(replica, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (),
_ => warn!("unexpected message {:?}", msg),
}
}
@@ -187,7 +204,7 @@ impl ReplicationConn {
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos: Lsn;
let mut end_pos = Lsn(0);
let mut wal_file: Option<File> = None;
loop {
@@ -202,7 +219,18 @@ impl ReplicationConn {
} else {
/* normal mode */
let timeline = swh.timeline.get();
end_pos = timeline.wait_for_lsn(start_pos);
if let Some(lsn) = timeline.wait_for_lsn(start_pos) {
end_pos = lsn
} else {
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
}
if end_pos == END_REPLICATION_MARKER {
break;
@@ -257,18 +285,3 @@ impl ReplicationConn {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
// A no-op impl for tests
impl HsFeedbackSubscriber for () {}
#[test]
fn test_replication_conn_background_thread_eof() {
// Test that background_thread recognizes EOF
let stream: &[u8] = &[];
ReplicationConn::background_thread(stream, ()).unwrap();
}
}

View File

@@ -188,6 +188,8 @@ pub struct AppendResponse {
// We report back our awareness about which WAL is committed, as this is
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
// Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers)
pub disk_consistent_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
}
@@ -411,6 +413,7 @@ where
epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0),
flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
};
return Ok(AcceptorProposerMessage::AppendResponse(resp));
@@ -516,6 +519,7 @@ where
epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};

View File

@@ -11,9 +11,9 @@ use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
@@ -25,6 +25,29 @@ use crate::WalAcceptorConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
const CONTROL_FILE_NAME: &str = "safekeeper.control";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
/// combined disk_consistent_lsn of pageservers
pub disk_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
}
impl ReplicaState {
pub fn new() -> ReplicaState {
ReplicaState {
disk_consistent_lsn: Lsn(u64::MAX),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
}
}
}
/// Shared state associated with database instance (tenant)
struct SharedState {
@@ -33,8 +56,8 @@ struct SharedState {
/// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about
commit_lsn: Lsn,
/// combined hot standby feedback from all replicas
hs_feedback: HotStandbyFeedback,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
}
// A named boolean.
@@ -45,6 +68,35 @@ pub enum CreateControlFile {
}
impl SharedState {
/// Get combined stateof all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
for replica in &self.replicas {
if let Some(state) = replica {
acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts);
acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin);
acc.hs_feedback.catalog_xmin =
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
acc.disk_consistent_lsn =
Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
}
}
acc
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
pub fn add_replica(&mut self) -> usize {
let len = self.replicas.len();
for i in 0..len {
if self.replicas[i].is_none() {
return i;
}
}
self.replicas.push(None);
len
}
/// Restore SharedState from control file. Locks the control file along the
/// way to prevent running more than one instance of safekeeper on the same
/// data dir.
@@ -74,21 +126,10 @@ impl SharedState {
Ok(Self {
commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
replicas: Vec::new(),
})
}
/// Accumulate hot standby feedbacks from replicas
pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) {
self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin);
self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin);
self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts);
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_control_file(
@@ -178,20 +219,27 @@ impl Timeline {
}
}
/// Wait for an LSN to be committed.
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
/// as high as the LSN waited for.
/// as high as the LSN waited for, or None if timeout expired.
///
pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn {
pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
let mut shared_state = self.mutex.lock().unwrap();
loop {
let commit_lsn = shared_state.commit_lsn;
// This must be `>`, not `>=`.
if commit_lsn > lsn {
return commit_lsn;
return Some(commit_lsn);
}
shared_state = self.cond.wait(shared_state).unwrap();
let result = self
.cond
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
.unwrap();
if result.1.timed_out() {
return None;
}
shared_state = result.0
}
}
@@ -219,9 +267,11 @@ impl Timeline {
// commit_lsn if we are catching up safekeeper.
commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
resp.hs_feedback = shared_state.hs_feedback.clone();
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
}
}
// Ping wal sender that new data might be available.
@@ -233,15 +283,14 @@ impl Timeline {
self.mutex.lock().unwrap().sk.s.clone()
}
// Accumulate hot standby feedbacks from replicas
pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
pub fn add_replica(&self) -> usize {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.add_hs_feedback(feedback);
shared_state.add_replica()
}
pub fn get_hs_feedback(&self) -> HotStandbyFeedback {
let shared_state = self.mutex.lock().unwrap();
shared_state.hs_feedback.clone()
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.replicas[id] = state;
}
pub fn get_end_of_wal(&self) -> (Lsn, u32) {

View File

@@ -357,6 +357,7 @@ pub enum BeMessage<'a> {
RowDescription(&'a [RowDescriptor<'a>]),
XLogData(XLogDataBody<'a>),
NoticeResponse(String),
KeepAlive(WalSndKeepAlive),
}
// One row desciption in RowDescription packet.
@@ -408,6 +409,13 @@ pub struct XLogDataBody<'a> {
pub data: &'a [u8],
}
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub sent_ptr: u64,
pub timestamp: i64,
pub request_reply: bool,
}
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
// single text column
@@ -720,6 +728,17 @@ impl<'a> BeMessage<'a> {
})
.unwrap();
}
BeMessage::KeepAlive(req) => {
buf.put_u8(b'k');
write_body(buf, |buf| {
buf.put_u64(req.sent_ptr);
buf.put_i64(req.timestamp);
buf.put_u8(if req.request_reply { 1u8 } else { 0u8 });
Ok::<_, io::Error>(())
})
.unwrap();
}
}
Ok(())
}