mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 04:22:56 +00:00
Make safekeeper namings more consistent with reality.
s/send_wal.rs/handler.rs s/SendWalHandler/SafekeeperPostgresHandler s/replication.rs/send_wal.rs
This commit is contained in:
165
walkeeper/src/handler.rs
Normal file
165
walkeeper/src/handler.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
//! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
|
||||
//! protocol commands.
|
||||
|
||||
use crate::json_ctrl::handle_json_ctrl;
|
||||
use crate::receive_wal::ReceiveWalConn;
|
||||
use crate::send_wal::ReplicationConn;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::xlog_utils::PG_TLI;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::postgres_backend;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use crate::timeline::CreateControlFile;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
/// Safekeeper handler of postgres commands
|
||||
pub struct SafekeeperPostgresHandler {
|
||||
pub conf: SafeKeeperConf,
|
||||
/// assigned application name
|
||||
pub appname: Option<String>,
|
||||
pub tenantid: Option<ZTenantId>,
|
||||
pub timelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
//sender to communicate with callmemaybe thread
|
||||
pub tx: UnboundedSender<CallmeEvent>,
|
||||
}
|
||||
|
||||
impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
|
||||
let ztimelineid = sm
|
||||
.params
|
||||
.get("ztimelineid")
|
||||
.ok_or_else(|| anyhow!("timelineid is required"))?;
|
||||
self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?);
|
||||
|
||||
let ztenantid = sm
|
||||
.params
|
||||
.get("ztenantid")
|
||||
.ok_or_else(|| anyhow!("tenantid is required"))?;
|
||||
self.tenantid = Some(ZTenantId::from_str(ztenantid)?);
|
||||
|
||||
if let Some(app_name) = sm.params.get("application_name") {
|
||||
self.appname = Some(app_name.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||
// START_WAL_PUSH is the only command that initializes the timeline in production.
|
||||
// There is also JSON_CTRL command, which should initialize the timeline for testing.
|
||||
if self.timeline.is_none() {
|
||||
if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL")
|
||||
{
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
self.tenantid.unwrap(),
|
||||
self.timelineid.unwrap(),
|
||||
CreateControlFile::True,
|
||||
)?;
|
||||
} else {
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
self.tenantid.unwrap(),
|
||||
self.timelineid.unwrap(),
|
||||
CreateControlFile::False,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system(pgb)?;
|
||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||
ReplicationConn::new(pgb)
|
||||
.run(self, pgb, &query_string)
|
||||
.with_context(|| "failed to run ReplicationConn")?;
|
||||
} else if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
// TODO: this repeats query decoding logic from page_service so it is probably
|
||||
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
|
||||
let decoded_query_string = match query_string.last() {
|
||||
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
|
||||
_ => std::str::from_utf8(&query_string)?,
|
||||
};
|
||||
let pageserver_connstr = decoded_query_string
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.map(|s| s.to_owned());
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
.run(self)
|
||||
.with_context(|| "failed to run ReceiveWalConn")?;
|
||||
} else if query_string.starts_with(b"JSON_CTRL") {
|
||||
handle_json_ctrl(self, pgb, &query_string)?;
|
||||
} else {
|
||||
bail!("Unexpected command {:?}", query_string);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
pub fn new(conf: SafeKeeperConf, tx: UnboundedSender<CallmeEvent>) -> Self {
|
||||
SafekeeperPostgresHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
tenantid: None,
|
||||
timelineid: None,
|
||||
timeline: None,
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let start_pos = self.timeline.get().get_end_of_wal();
|
||||
let lsn = start_pos.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli = PG_TLI.to_string();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor {
|
||||
name: b"systemid",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"timeline",
|
||||
typoid: INT4_OID,
|
||||
typlen: 4,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"xlogpos",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"dbname",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(sysid_bytes),
|
||||
Some(tli_bytes),
|
||||
Some(lsn_bytes),
|
||||
None,
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -12,12 +12,12 @@ use crc32c::crc32c_append;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::TimelineTools;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils;
|
||||
@@ -57,7 +57,7 @@ struct AppendResult {
|
||||
/// content, and then append it with specified term and lsn. This
|
||||
/// function is used to test safekeepers in different scenarios.
|
||||
pub fn handle_json_ctrl(
|
||||
swh: &mut SendWalHandler,
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
cmd: &Bytes,
|
||||
) -> Result<()> {
|
||||
@@ -71,16 +71,16 @@ pub fn handle_json_ctrl(
|
||||
info!("JSON_CTRL request: {:?}", append_request);
|
||||
|
||||
// need to init safekeeper state before AppendRequest
|
||||
prepare_safekeeper(swh)?;
|
||||
prepare_safekeeper(spg)?;
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
|
||||
send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?;
|
||||
}
|
||||
|
||||
let inserted_wal = append_logical_message(swh, append_request)?;
|
||||
let inserted_wal = append_logical_message(spg, append_request)?;
|
||||
let response = AppendResult {
|
||||
state: swh.timeline.get().get_info(),
|
||||
state: spg.timeline.get().get_info(),
|
||||
inserted_wal,
|
||||
};
|
||||
let response_data = serde_json::to_vec(&response)?;
|
||||
@@ -98,28 +98,28 @@ pub fn handle_json_ctrl(
|
||||
|
||||
/// Prepare safekeeper to process append requests without crashes,
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
|
||||
fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
|
||||
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||
protocol_version: 1, // current protocol
|
||||
pg_version: 0, // unknown
|
||||
proposer_id: [0u8; 16],
|
||||
system_id: 0,
|
||||
ztli: swh.timelineid.unwrap(),
|
||||
tenant_id: swh.tenantid.unwrap(),
|
||||
ztli: spg.timelineid.unwrap(),
|
||||
tenant_id: spg.tenantid.unwrap(),
|
||||
tli: 0,
|
||||
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
|
||||
});
|
||||
|
||||
let response = swh.timeline.get().process_msg(&greeting_request)?;
|
||||
let response = spg.timeline.get().process_msg(&greeting_request)?;
|
||||
match response {
|
||||
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
|
||||
_ => anyhow::bail!("not GreetingResponse"),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
|
||||
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
|
||||
// add new term to existing history
|
||||
let history = swh.timeline.get().get_info().acceptor_state.term_history;
|
||||
let history = spg.timeline.get().get_info().acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
@@ -131,7 +131,7 @@ fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Resu
|
||||
term_history: history,
|
||||
});
|
||||
|
||||
swh.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
spg.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -145,11 +145,11 @@ struct InsertedWAL {
|
||||
/// Extend local WAL with new LogicalMessage record. To do that,
|
||||
/// create AppendRequest with new WAL and pass it to safekeeper.
|
||||
fn append_logical_message(
|
||||
swh: &mut SendWalHandler,
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
msg: AppendLogicalMessage,
|
||||
) -> Result<InsertedWAL> {
|
||||
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
|
||||
let sk_state = swh.timeline.get().get_info();
|
||||
let sk_state = spg.timeline.get().get_info();
|
||||
|
||||
let begin_lsn = msg.begin_lsn;
|
||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||
@@ -173,7 +173,7 @@ fn append_logical_message(
|
||||
wal_data: Bytes::from(wal_data),
|
||||
});
|
||||
|
||||
let response = swh.timeline.get().process_msg(&append_request)?;
|
||||
let response = spg.timeline.get().process_msg(&append_request)?;
|
||||
|
||||
let append_response = match response {
|
||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||
|
||||
@@ -5,10 +5,10 @@ use std::time::Duration;
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
pub mod callmemaybe;
|
||||
pub mod handler;
|
||||
pub mod http;
|
||||
pub mod json_ctrl;
|
||||
pub mod receive_wal;
|
||||
pub mod replication;
|
||||
pub mod s3_offload;
|
||||
pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Safekeeper communication endpoint to wal proposer (compute node).
|
||||
//! Safekeeper communication endpoint to WAL proposer (compute node).
|
||||
//! Gets messages from the network, passes them down to consensus module and
|
||||
//! sends replies back.
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::sync::Arc;
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::timeline::TimelineTools;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage};
|
||||
@@ -71,8 +71,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
/// Receive WAL from wal_proposer
|
||||
pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> {
|
||||
let _enter = info_span!("WAL acceptor", timeline = %swh.timelineid.unwrap()).entered();
|
||||
pub fn run(&mut self, spg: &mut SafekeeperPostgresHandler) -> Result<()> {
|
||||
let _enter = info_span!("WAL acceptor", timeline = %spg.timelineid.unwrap()).entered();
|
||||
|
||||
// Notify the libpq client that it's allowed to send `CopyData` messages
|
||||
self.pg_backend
|
||||
@@ -95,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
// Incoming WAL stream resumed, so reset information about the timeline pause.
|
||||
swh.timeline.get().continue_streaming();
|
||||
spg.timeline.get().continue_streaming();
|
||||
|
||||
// if requested, ask pageserver to fetch wal from us
|
||||
// as long as this wal_stream is alive, callmemaybe thread
|
||||
@@ -105,10 +105,10 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver
|
||||
// we should use callmemaybe mechanism.
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
let tx_clone = swh.tx.clone();
|
||||
let timelineid = spg.timeline.get().timelineid;
|
||||
let tx_clone = spg.tx.clone();
|
||||
let pageserver_connstr = pageserver_connstr.to_owned();
|
||||
swh.tx
|
||||
spg.tx
|
||||
.send(CallmeEvent::Subscribe(
|
||||
tenant_id,
|
||||
timelineid,
|
||||
@@ -126,14 +126,14 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
timeline: Arc::clone(swh.timeline.get()),
|
||||
timeline: Arc::clone(spg.timeline.get()),
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
loop {
|
||||
let reply = swh
|
||||
let reply = spg
|
||||
.timeline
|
||||
.get()
|
||||
.process_msg(&msg)
|
||||
|
||||
@@ -1,369 +0,0 @@
|
||||
//! This module implements the streaming side of replication protocol, starting
|
||||
//! with the "START_REPLICATION" message.
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::net::Shutdown;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::{str, thread};
|
||||
use tracing::*;
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use zenith_utils::sock_split::ReadStream;
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
// See: https://www.postgresql.org/docs/13/protocol-replication.html
|
||||
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
|
||||
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
|
||||
|
||||
type FullTransactionId = u64;
|
||||
|
||||
/// Hot standby feedback received from replica
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
pub struct HotStandbyFeedback {
|
||||
pub ts: TimestampTz,
|
||||
pub xmin: FullTransactionId,
|
||||
pub catalog_xmin: FullTransactionId,
|
||||
}
|
||||
|
||||
impl HotStandbyFeedback {
|
||||
pub fn empty() -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: 0,
|
||||
catalog_xmin: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Standby status update
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StandbyReply {
|
||||
pub write_lsn: Lsn, // last lsn received by pageserver
|
||||
pub flush_lsn: Lsn, // not used
|
||||
pub apply_lsn: Lsn, // pageserver's disk consistent lSN
|
||||
pub reply_ts: TimestampTz,
|
||||
pub reply_requested: bool,
|
||||
}
|
||||
|
||||
/// A network connection that's speaking the replication protocol.
|
||||
pub struct ReplicationConn {
|
||||
/// This is an `Option` because we will spawn a background thread that will
|
||||
/// `take` it from us.
|
||||
stream_in: Option<ReadStream>,
|
||||
}
|
||||
|
||||
/// Scope guard to unregister replication connection from timeline
|
||||
struct ReplicationConnGuard {
|
||||
replica: usize, // replica internal ID assigned by timeline
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl Drop for ReplicationConnGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.update_replica_state(self.replica, None);
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: Naming is a bit messy here.
|
||||
// This ReplicationStreamGuard lives as long as ReplicationConn
|
||||
// and current ReplicationConnGuard is tied to the background thread
|
||||
// that receives feedback.
|
||||
struct ReplicationStreamGuard {
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
tenant_id: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
}
|
||||
|
||||
impl Drop for ReplicationStreamGuard {
|
||||
fn drop(&mut self) {
|
||||
// the connection with pageserver is lost,
|
||||
// resume callback subscription
|
||||
debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}",
|
||||
self.tenant_id, self.timelineid);
|
||||
|
||||
self.tx
|
||||
.send(CallmeEvent::Resume(self.tenant_id, self.timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Resume request to callmemaybe thread {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplicationConn {
|
||||
/// Create a new `ReplicationConn`
|
||||
pub fn new(pgb: &mut PostgresBackend) -> Self {
|
||||
Self {
|
||||
stream_in: pgb.take_stream_in(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle incoming messages from the network.
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
fn background_thread(
|
||||
mut stream_in: ReadStream,
|
||||
timeline: Arc<Timeline>,
|
||||
replica_id: usize,
|
||||
) -> Result<()> {
|
||||
let mut state = ReplicaState::new();
|
||||
let _guard = ReplicationConnGuard {
|
||||
replica: replica_id,
|
||||
timeline: timeline.clone(),
|
||||
};
|
||||
// Wait for replica's feedback.
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match &msg {
|
||||
FeMessage::CopyData(m) => {
|
||||
// There's two possible data messages that the client is supposed to send here:
|
||||
// `HotStandbyFeedback` and `StandbyStatusUpdate`.
|
||||
|
||||
match m.first().cloned() {
|
||||
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
|
||||
// Note: deserializing is on m[1..] because we skip the tag byte.
|
||||
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
|
||||
let reply = StandbyReply::des(&m[1..])
|
||||
.context("failed to deserialize StandbyReply")?;
|
||||
state.disk_consistent_lsn = reply.apply_lsn;
|
||||
state.last_received_lsn = reply.write_lsn;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
}
|
||||
_ => warn!("unexpected message {:?}", msg),
|
||||
}
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
FeMessage::CopyFail => {
|
||||
// Shutdown the connection, because rust-postgres client cannot be dropped
|
||||
// when connection is alive.
|
||||
let _ = stream_in.shutdown(Shutdown::Both);
|
||||
return Err(anyhow!("Copy failed"));
|
||||
}
|
||||
_ => {
|
||||
// We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored.
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function that parses a single LSN.
|
||||
fn parse_start(cmd: &[u8]) -> Result<Lsn> {
|
||||
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
let caps = re.captures_iter(str::from_utf8(cmd)?);
|
||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||
let start_pos = lsns
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
|
||||
assert!(lsns.next().is_none());
|
||||
Ok(start_pos)
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = File::open(&partial_path) {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
File::open(&wal_file_path)
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
error!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle START_REPLICATION replication command
|
||||
///
|
||||
pub fn run(
|
||||
&mut self,
|
||||
swh: &mut SendWalHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
cmd: &Bytes,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %swh.timelineid.unwrap()).entered();
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(swh.timeline.get());
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
|
||||
let state = ReplicaState::new();
|
||||
// This replica_id is used below to check if it's time to stop replication.
|
||||
let replica_id = bg_timeline.add_replica(state);
|
||||
|
||||
// TODO: here we got two threads, one for writing WAL and one for receiving
|
||||
// feedback. If one of them fails, we should shutdown the other one too.
|
||||
let _ = thread::Builder::new()
|
||||
.name("HotStandbyFeedback thread".into())
|
||||
.spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
|
||||
error!("Replication background thread failed: {}", err);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut start_pos = Self::parse_start(cmd)?;
|
||||
|
||||
let mut wal_seg_size: usize;
|
||||
loop {
|
||||
wal_seg_size = swh.timeline.get().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
error!("Cannot start replication before connecting to wal_proposer");
|
||||
sleep(Duration::from_secs(1));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let wal_end = swh.timeline.get().get_end_of_wal();
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if swh.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
Some(wal_end)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||
|
||||
// Don't spam pageserver with callmemaybe queries
|
||||
// when replication connection with pageserver is already established.
|
||||
let _guard = {
|
||||
if swh.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
None
|
||||
} else {
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
let tenant_id = swh.tenantid.unwrap();
|
||||
let tx_clone = swh.tx.clone();
|
||||
swh.tx
|
||||
.send(CallmeEvent::Pause(tenant_id, timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
||||
});
|
||||
|
||||
// create a guard to subscribe callback again, when this connection will exit
|
||||
Some(ReplicationStreamGuard {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
let mut end_pos = Lsn(0);
|
||||
let mut wal_file: Option<File> = None;
|
||||
|
||||
loop {
|
||||
if let Some(stop_pos) = stop_pos {
|
||||
if start_pos >= stop_pos {
|
||||
break; /* recovery finished */
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* Wait until we have some data to stream */
|
||||
let lsn = swh.timeline.get().wait_for_lsn(start_pos);
|
||||
|
||||
if let Some(lsn) = lsn {
|
||||
end_pos = lsn;
|
||||
} else {
|
||||
// Is is time to end streaming to this replica?
|
||||
if swh.timeline.get().check_stop_streaming(replica_id) {
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", swh.appname);
|
||||
}
|
||||
|
||||
// timeout expired: request pageserver status
|
||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.context("Failed to send KeepAlive message")?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Take the `File` from `wal_file`, or open a new file.
|
||||
let mut file = match wal_file.take() {
|
||||
Some(file) => file,
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let timeline_id = swh.timeline.get().timelineid;
|
||||
let wal_file_path = swh.conf.timeline_dir(&timeline_id).join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
}
|
||||
};
|
||||
|
||||
let xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than MAX_SEND_SIZE.
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, wal_seg_size - xlogoff);
|
||||
let send_size = min(send_size, MAX_SEND_SIZE);
|
||||
|
||||
// Read some data from the file.
|
||||
let mut file_buf = vec![0u8; send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))
|
||||
.and_then(|_| file.read_exact(&mut file_buf))
|
||||
.context("Failed to read data from WAL file")?;
|
||||
|
||||
// Write some data to the network socket.
|
||||
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: start_pos.0,
|
||||
wal_end: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: &file_buf,
|
||||
}))
|
||||
.context("Failed to send XLogData")?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
|
||||
info!("sent WAL up to {}", start_pos);
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
if start_pos.segment_offset(wal_seg_size) != 0 {
|
||||
wal_file = Some(file);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ use tracing::*;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
use crate::replication::HotStandbyFeedback;
|
||||
use crate::send_wal::HotStandbyFeedback;
|
||||
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
|
||||
use zenith_metrics::{
|
||||
register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec,
|
||||
|
||||
@@ -1,166 +1,369 @@
|
||||
//! Part of Safekeeper pretending to be Postgres, streaming xlog to
|
||||
//! pageserver/any other consumer.
|
||||
//!
|
||||
//! This module implements the streaming side of replication protocol, starting
|
||||
//! with the "START_REPLICATION" message.
|
||||
|
||||
use crate::json_ctrl::handle_json_ctrl;
|
||||
use crate::receive_wal::ReceiveWalConn;
|
||||
use crate::replication::ReplicationConn;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::xlog_utils::PG_TLI;
|
||||
use std::str::FromStr;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::net::Shutdown;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::postgres_backend;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::{str, thread};
|
||||
use tracing::*;
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use zenith_utils::sock_split::ReadStream;
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use crate::timeline::CreateControlFile;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
/// Handler for streaming WAL from acceptor
|
||||
pub struct SendWalHandler {
|
||||
pub conf: SafeKeeperConf,
|
||||
/// assigned application name
|
||||
pub appname: Option<String>,
|
||||
pub tenantid: Option<ZTenantId>,
|
||||
pub timelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
//sender to communicate with callmemaybe thread
|
||||
pub tx: UnboundedSender<CallmeEvent>,
|
||||
// See: https://www.postgresql.org/docs/13/protocol-replication.html
|
||||
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
|
||||
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
|
||||
|
||||
type FullTransactionId = u64;
|
||||
|
||||
/// Hot standby feedback received from replica
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
pub struct HotStandbyFeedback {
|
||||
pub ts: TimestampTz,
|
||||
pub xmin: FullTransactionId,
|
||||
pub catalog_xmin: FullTransactionId,
|
||||
}
|
||||
|
||||
impl postgres_backend::Handler for SendWalHandler {
|
||||
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
|
||||
let ztimelineid = sm
|
||||
.params
|
||||
.get("ztimelineid")
|
||||
.ok_or_else(|| anyhow!("timelineid is required"))?;
|
||||
self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?);
|
||||
|
||||
let ztenantid = sm
|
||||
.params
|
||||
.get("ztenantid")
|
||||
.ok_or_else(|| anyhow!("tenantid is required"))?;
|
||||
self.tenantid = Some(ZTenantId::from_str(ztenantid)?);
|
||||
|
||||
if let Some(app_name) = sm.params.get("application_name") {
|
||||
self.appname = Some(app_name.clone());
|
||||
impl HotStandbyFeedback {
|
||||
pub fn empty() -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: 0,
|
||||
catalog_xmin: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Standby status update
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StandbyReply {
|
||||
pub write_lsn: Lsn, // last lsn received by pageserver
|
||||
pub flush_lsn: Lsn, // not used
|
||||
pub apply_lsn: Lsn, // pageserver's disk consistent lSN
|
||||
pub reply_ts: TimestampTz,
|
||||
pub reply_requested: bool,
|
||||
}
|
||||
|
||||
/// A network connection that's speaking the replication protocol.
|
||||
pub struct ReplicationConn {
|
||||
/// This is an `Option` because we will spawn a background thread that will
|
||||
/// `take` it from us.
|
||||
stream_in: Option<ReadStream>,
|
||||
}
|
||||
|
||||
/// Scope guard to unregister replication connection from timeline
|
||||
struct ReplicationConnGuard {
|
||||
replica: usize, // replica internal ID assigned by timeline
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl Drop for ReplicationConnGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.update_replica_state(self.replica, None);
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: Naming is a bit messy here.
|
||||
// This ReplicationStreamGuard lives as long as ReplicationConn
|
||||
// and current ReplicationConnGuard is tied to the background thread
|
||||
// that receives feedback.
|
||||
struct ReplicationStreamGuard {
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
tenant_id: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
}
|
||||
|
||||
impl Drop for ReplicationStreamGuard {
|
||||
fn drop(&mut self) {
|
||||
// the connection with pageserver is lost,
|
||||
// resume callback subscription
|
||||
debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}",
|
||||
self.tenant_id, self.timelineid);
|
||||
|
||||
self.tx
|
||||
.send(CallmeEvent::Resume(self.tenant_id, self.timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Resume request to callmemaybe thread {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplicationConn {
|
||||
/// Create a new `ReplicationConn`
|
||||
pub fn new(pgb: &mut PostgresBackend) -> Self {
|
||||
Self {
|
||||
stream_in: pgb.take_stream_in(),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||
// START_WAL_PUSH is the only command that initializes the timeline in production.
|
||||
// There is also JSON_CTRL command, which should initialize the timeline for testing.
|
||||
if self.timeline.is_none() {
|
||||
if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL")
|
||||
{
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
self.tenantid.unwrap(),
|
||||
self.timelineid.unwrap(),
|
||||
CreateControlFile::True,
|
||||
)?;
|
||||
} else {
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
self.tenantid.unwrap(),
|
||||
self.timelineid.unwrap(),
|
||||
CreateControlFile::False,
|
||||
)?;
|
||||
/// Handle incoming messages from the network.
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
fn background_thread(
|
||||
mut stream_in: ReadStream,
|
||||
timeline: Arc<Timeline>,
|
||||
replica_id: usize,
|
||||
) -> Result<()> {
|
||||
let mut state = ReplicaState::new();
|
||||
let _guard = ReplicationConnGuard {
|
||||
replica: replica_id,
|
||||
timeline: timeline.clone(),
|
||||
};
|
||||
// Wait for replica's feedback.
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match &msg {
|
||||
FeMessage::CopyData(m) => {
|
||||
// There's two possible data messages that the client is supposed to send here:
|
||||
// `HotStandbyFeedback` and `StandbyStatusUpdate`.
|
||||
|
||||
match m.first().cloned() {
|
||||
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
|
||||
// Note: deserializing is on m[1..] because we skip the tag byte.
|
||||
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
|
||||
let reply = StandbyReply::des(&m[1..])
|
||||
.context("failed to deserialize StandbyReply")?;
|
||||
state.disk_consistent_lsn = reply.apply_lsn;
|
||||
state.last_received_lsn = reply.write_lsn;
|
||||
timeline.update_replica_state(replica_id, Some(state));
|
||||
}
|
||||
_ => warn!("unexpected message {:?}", msg),
|
||||
}
|
||||
}
|
||||
FeMessage::Sync => {}
|
||||
FeMessage::CopyFail => {
|
||||
// Shutdown the connection, because rust-postgres client cannot be dropped
|
||||
// when connection is alive.
|
||||
let _ = stream_in.shutdown(Shutdown::Both);
|
||||
return Err(anyhow!("Copy failed"));
|
||||
}
|
||||
_ => {
|
||||
// We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored.
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system(pgb)?;
|
||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||
ReplicationConn::new(pgb)
|
||||
.run(self, pgb, &query_string)
|
||||
.with_context(|| "failed to run ReplicationConn")?;
|
||||
} else if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
// TODO: this repeats query decoding logic from page_service so it is probably
|
||||
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
|
||||
let decoded_query_string = match query_string.last() {
|
||||
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
|
||||
_ => std::str::from_utf8(&query_string)?,
|
||||
};
|
||||
let pageserver_connstr = decoded_query_string
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.map(|s| s.to_owned());
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
.run(self)
|
||||
.with_context(|| "failed to run ReceiveWalConn")?;
|
||||
} else if query_string.starts_with(b"JSON_CTRL") {
|
||||
handle_json_ctrl(self, pgb, &query_string)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function that parses a single LSN.
|
||||
fn parse_start(cmd: &[u8]) -> Result<Lsn> {
|
||||
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
let caps = re.captures_iter(str::from_utf8(cmd)?);
|
||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||
let start_pos = lsns
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
|
||||
assert!(lsns.next().is_none());
|
||||
Ok(start_pos)
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = File::open(&partial_path) {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
File::open(&wal_file_path)
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
error!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle START_REPLICATION replication command
|
||||
///
|
||||
pub fn run(
|
||||
&mut self,
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
cmd: &Bytes,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.timelineid.unwrap()).entered();
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(spg.timeline.get());
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
|
||||
let state = ReplicaState::new();
|
||||
// This replica_id is used below to check if it's time to stop replication.
|
||||
let replica_id = bg_timeline.add_replica(state);
|
||||
|
||||
// TODO: here we got two threads, one for writing WAL and one for receiving
|
||||
// feedback. If one of them fails, we should shutdown the other one too.
|
||||
let _ = thread::Builder::new()
|
||||
.name("HotStandbyFeedback thread".into())
|
||||
.spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
|
||||
error!("Replication background thread failed: {}", err);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut start_pos = Self::parse_start(cmd)?;
|
||||
|
||||
let mut wal_seg_size: usize;
|
||||
loop {
|
||||
wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
error!("Cannot start replication before connecting to wal_proposer");
|
||||
sleep(Duration::from_secs(1));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let wal_end = spg.timeline.get().get_end_of_wal();
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
Some(wal_end)
|
||||
} else {
|
||||
bail!("Unexpected command {:?}", query_string);
|
||||
None
|
||||
};
|
||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||
|
||||
// Don't spam pageserver with callmemaybe queries
|
||||
// when replication connection with pageserver is already established.
|
||||
let _guard = {
|
||||
if spg.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
None
|
||||
} else {
|
||||
let timelineid = spg.timeline.get().timelineid;
|
||||
let tenant_id = spg.tenantid.unwrap();
|
||||
let tx_clone = spg.tx.clone();
|
||||
spg.tx
|
||||
.send(CallmeEvent::Pause(tenant_id, timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
||||
});
|
||||
|
||||
// create a guard to subscribe callback again, when this connection will exit
|
||||
Some(ReplicationStreamGuard {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
let mut end_pos = Lsn(0);
|
||||
let mut wal_file: Option<File> = None;
|
||||
|
||||
loop {
|
||||
if let Some(stop_pos) = stop_pos {
|
||||
if start_pos >= stop_pos {
|
||||
break; /* recovery finished */
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* Wait until we have some data to stream */
|
||||
let lsn = spg.timeline.get().wait_for_lsn(start_pos);
|
||||
|
||||
if let Some(lsn) = lsn {
|
||||
end_pos = lsn;
|
||||
} else {
|
||||
// Is is time to end streaming to this replica?
|
||||
if spg.timeline.get().check_stop_streaming(replica_id) {
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", spg.appname);
|
||||
}
|
||||
|
||||
// timeout expired: request pageserver status
|
||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.context("Failed to send KeepAlive message")?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Take the `File` from `wal_file`, or open a new file.
|
||||
let mut file = match wal_file.take() {
|
||||
Some(file) => file,
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let timeline_id = spg.timeline.get().timelineid;
|
||||
let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
}
|
||||
};
|
||||
|
||||
let xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than MAX_SEND_SIZE.
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, wal_seg_size - xlogoff);
|
||||
let send_size = min(send_size, MAX_SEND_SIZE);
|
||||
|
||||
// Read some data from the file.
|
||||
let mut file_buf = vec![0u8; send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))
|
||||
.and_then(|_| file.read_exact(&mut file_buf))
|
||||
.context("Failed to read data from WAL file")?;
|
||||
|
||||
// Write some data to the network socket.
|
||||
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: start_pos.0,
|
||||
wal_end: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: &file_buf,
|
||||
}))
|
||||
.context("Failed to send XLogData")?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
|
||||
info!("sent WAL up to {}", start_pos);
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
if start_pos.segment_offset(wal_seg_size) != 0 {
|
||||
wal_file = Some(file);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SendWalHandler {
|
||||
pub fn new(conf: SafeKeeperConf, tx: UnboundedSender<CallmeEvent>) -> Self {
|
||||
SendWalHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
tenantid: None,
|
||||
timelineid: None,
|
||||
timeline: None,
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let start_pos = self.timeline.get().get_end_of_wal();
|
||||
let lsn = start_pos.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli = PG_TLI.to_string();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor {
|
||||
name: b"systemid",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"timeline",
|
||||
typoid: INT4_OID,
|
||||
typlen: 4,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"xlogpos",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"dbname",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(sysid_bytes),
|
||||
Some(tli_bytes),
|
||||
Some(lsn_bytes),
|
||||
None,
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,11 +18,11 @@ use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::replication::HotStandbyFeedback;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
|
||||
Storage, SK_FORMAT_VERSION, SK_MAGIC,
|
||||
};
|
||||
use crate::send_wal::HotStandbyFeedback;
|
||||
use crate::upgrade::upgrade_control_file;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::thread;
|
||||
use tracing::*;
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::SafeKeeperConf;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
|
||||
@@ -60,7 +60,7 @@ fn handle_socket(
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
|
||||
let mut conn_handler = SendWalHandler::new(conf, tx);
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(conf, tx);
|
||||
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?;
|
||||
// libpq replication protocol between safekeeper and replicas/pagers
|
||||
pgbackend.run(&mut conn_handler)?;
|
||||
|
||||
Reference in New Issue
Block a user