mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
break wal_service into multiple files
+ misc cleanups
This commit is contained in:
committed by
Eric Seppanen
parent
513696a485
commit
a11558b84f
@@ -4,7 +4,9 @@ use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
pub mod pq_protocol;
|
||||
pub mod replication;
|
||||
pub mod s3_offload;
|
||||
pub mod send_wal;
|
||||
pub mod wal_service;
|
||||
|
||||
use crate::pq_protocol::SystemId;
|
||||
|
||||
241
walkeeper/src/replication.rs
Normal file
241
walkeeper/src/replication.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
//! This module implements the replication protocol, starting with the
|
||||
//! "START REPLICATION" message.
|
||||
|
||||
use crate::pq_protocol::{BeMessage, FeMessage};
|
||||
use crate::send_wal::SendWal;
|
||||
use crate::wal_service::{
|
||||
HotStandbyFeedback, Timeline, TimelineTools, END_REPLICATION_MARKER, MAX_SEND_SIZE,
|
||||
};
|
||||
use crate::WalAcceptorConf;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{get_current_timestamp, XLogFileName};
|
||||
use regex::Regex;
|
||||
use std::cmp::min;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{str, thread};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */
|
||||
const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */
|
||||
const LIBPQ_MSG_SIZE_OFFS: usize = 1;
|
||||
|
||||
// FIXME: we don't use consistent endian on this data structure.
|
||||
// In wal_service it's little-endian, but here it's big-endian.
|
||||
// FIXME: This function should go away and be replaced by
|
||||
// derived serde::Deserialize
|
||||
impl HotStandbyFeedback {
|
||||
fn parse(body: &Bytes) -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
ts: BigEndian::read_u64(&body[0..8]),
|
||||
xmin: BigEndian::read_u64(&body[8..16]),
|
||||
catalog_xmin: BigEndian::read_u64(&body[16..24]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReplicationHandler {
|
||||
timeline: Option<Arc<Timeline>>,
|
||||
/// Postgres connection, buffered input
|
||||
///
|
||||
/// This is an `Option` because we will spawn a background thread that will
|
||||
/// `take` it from us.
|
||||
stream_in: Option<BufReader<TcpStream>>,
|
||||
/// Postgres connection, output
|
||||
stream_out: Mutex<TcpStream>,
|
||||
/// wal acceptor configuration
|
||||
conf: WalAcceptorConf,
|
||||
/// assigned application name
|
||||
appname: Option<String>,
|
||||
}
|
||||
|
||||
impl ReplicationHandler {
|
||||
/// Create a new `SendWal`, consuming the `Connection`.
|
||||
pub fn new(conn: SendWal) -> Self {
|
||||
Self {
|
||||
timeline: conn.timeline,
|
||||
stream_in: Some(conn.stream_in),
|
||||
stream_out: Mutex::new(conn.stream_out),
|
||||
conf: conn.conf,
|
||||
appname: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle incoming messages from the network.
|
||||
///
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
///
|
||||
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
|
||||
// Wait for replica's feedback.
|
||||
// We only handle `CopyData` messages. Anything else is ignored.
|
||||
loop {
|
||||
match FeMessage::read_from(&mut stream_in)? {
|
||||
FeMessage::CopyData(m) => {
|
||||
timeline.add_hs_feedback(HotStandbyFeedback::parse(&m.body))
|
||||
}
|
||||
msg => {
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function that parses a pair of LSNs.
|
||||
fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> {
|
||||
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
let caps = re.captures_iter(str::from_utf8(&cmd[..])?);
|
||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||
let start_pos = lsns
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("failed to find start LSN"))??;
|
||||
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0));
|
||||
Ok((start_pos, stop_pos))
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = File::open(&partial_path) {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
match File::open(&wal_file_path) {
|
||||
Ok(opened_file) => return Ok(opened_file),
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle START_REPLICATION replication command
|
||||
///
|
||||
pub fn run(&mut self, cmd: &Bytes) -> Result<()> {
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(self.timeline.get());
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) {
|
||||
error!("socket error: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?;
|
||||
|
||||
let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
bail!("Can not start replication before connecting to wal_proposer");
|
||||
}
|
||||
let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
|
||||
if start_pos == Lsn(0) {
|
||||
start_pos = wal_end;
|
||||
}
|
||||
if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
stop_pos = wal_end;
|
||||
}
|
||||
info!("Start replication from {} till {}", start_pos, stop_pos);
|
||||
|
||||
let mut outbuf = BytesMut::new();
|
||||
BeMessage::write(&mut outbuf, &BeMessage::Copy);
|
||||
self.send(&outbuf)?;
|
||||
outbuf.clear();
|
||||
|
||||
let mut end_pos: Lsn;
|
||||
let mut wal_file: Option<File> = None;
|
||||
|
||||
loop {
|
||||
/* Wait until we have some data to stream */
|
||||
if stop_pos != Lsn(0) {
|
||||
/* recovery mode: stream up to the specified LSN (VCL) */
|
||||
if start_pos >= stop_pos {
|
||||
/* recovery finished */
|
||||
break;
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* normal mode */
|
||||
let timeline = self.timeline.get();
|
||||
end_pos = timeline.wait_for_lsn(start_pos);
|
||||
}
|
||||
if end_pos == END_REPLICATION_MARKER {
|
||||
break;
|
||||
}
|
||||
|
||||
// Take the `File` from `wal_file`, or open a new file.
|
||||
let mut file = match wal_file.take() {
|
||||
Some(file) => file,
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = start_pos.segment_number(wal_seg_size as u64);
|
||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||
let timeline_id = self.timeline.get().timelineid.to_string();
|
||||
let wal_file_path = self.conf.data_dir.join(timeline_id).join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
}
|
||||
};
|
||||
|
||||
let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than MAX_SEND_SIZE.
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, wal_seg_size - xlogoff);
|
||||
let send_size = min(send_size, MAX_SEND_SIZE);
|
||||
|
||||
let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size;
|
||||
|
||||
// Read some data from the file.
|
||||
let mut file_buf = vec![0u8; send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file.read_exact(&mut file_buf)?;
|
||||
|
||||
// Write some data to the network socket.
|
||||
// FIXME: turn these into structs.
|
||||
// 'd' is CopyData;
|
||||
// 'w' is "WAL records"
|
||||
// https://www.postgresql.org/docs/9.1/protocol-message-formats.html
|
||||
// src/backend/replication/walreceiver.c
|
||||
outbuf.clear();
|
||||
outbuf.put_u8(b'd');
|
||||
outbuf.put_u32((msg_size - LIBPQ_MSG_SIZE_OFFS) as u32);
|
||||
outbuf.put_u8(b'w');
|
||||
outbuf.put_u64(start_pos.0);
|
||||
outbuf.put_u64(end_pos.0);
|
||||
outbuf.put_u64(get_current_timestamp());
|
||||
|
||||
assert!(outbuf.len() + file_buf.len() == msg_size);
|
||||
// FIXME: combine these two into a single send,
|
||||
// so that no other traffic can be sent in between them.
|
||||
self.send(&outbuf)?;
|
||||
self.send(&file_buf)?;
|
||||
start_pos += send_size as u64;
|
||||
|
||||
debug!("Sent WAL to page server up to {}", end_pos);
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
if start_pos.segment_offset(wal_seg_size as u64) != 0 {
|
||||
wal_file = Some(file);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unlock the mutex and send bytes on the network.
|
||||
fn send(&self, buf: &[u8]) -> Result<()> {
|
||||
let mut writer = self.stream_out.lock().unwrap();
|
||||
writer.write_all(buf.as_ref())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
151
walkeeper/src/send_wal.rs
Normal file
151
walkeeper/src/send_wal.rs
Normal file
@@ -0,0 +1,151 @@
|
||||
//! This implements the libpq replication protocol between wal_acceptor and replicas/pagers
|
||||
//!
|
||||
|
||||
use crate::pq_protocol::{
|
||||
BeMessage, FeMessage, FeStartupMessage, RowDescriptor, StartupRequestCode,
|
||||
};
|
||||
use crate::replication::ReplicationHandler;
|
||||
use crate::wal_service::{Connection, Timeline, TimelineTools};
|
||||
use crate::WalAcceptorConf;
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::BytesMut;
|
||||
use log::*;
|
||||
use std::io::{BufReader, Write};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct SendWal {
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
/// Postgres connection, buffered input
|
||||
pub stream_in: BufReader<TcpStream>,
|
||||
/// Postgres connection, output
|
||||
pub stream_out: TcpStream,
|
||||
/// The cached result of socket.peer_addr()
|
||||
pub peer_addr: SocketAddr,
|
||||
/// wal acceptor configuration
|
||||
pub conf: WalAcceptorConf,
|
||||
/// assigned application name
|
||||
appname: Option<String>,
|
||||
}
|
||||
|
||||
impl SendWal {
|
||||
/// Create a new `SendWal`, consuming the `Connection`.
|
||||
pub fn new(conn: Connection) -> Self {
|
||||
Self {
|
||||
timeline: conn.timeline,
|
||||
stream_in: conn.stream_in,
|
||||
stream_out: conn.stream_out,
|
||||
peer_addr: conn.peer_addr,
|
||||
conf: conn.conf,
|
||||
appname: None,
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Send WAL to replica or WAL receiver using standard libpq replication protocol
|
||||
///
|
||||
pub fn run(mut self) -> Result<()> {
|
||||
let peer_addr = self.peer_addr.clone();
|
||||
info!("WAL sender to {:?} is started", peer_addr);
|
||||
|
||||
// Handle the startup message first.
|
||||
|
||||
let m = FeStartupMessage::read_from(&mut self.stream_in)?;
|
||||
trace!("got startup message {:?}", m);
|
||||
match m.kind {
|
||||
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
||||
let mut buf = BytesMut::new();
|
||||
BeMessage::write(&mut buf, &BeMessage::Negotiate);
|
||||
info!("SSL requested");
|
||||
self.stream_out.write_all(&buf)?;
|
||||
}
|
||||
StartupRequestCode::Normal => {
|
||||
let mut buf = BytesMut::new();
|
||||
BeMessage::write(&mut buf, &BeMessage::AuthenticationOk);
|
||||
BeMessage::write(&mut buf, &BeMessage::ReadyForQuery);
|
||||
self.stream_out.write_all(&buf)?;
|
||||
self.timeline.set(m.timelineid)?;
|
||||
self.appname = m.appname;
|
||||
}
|
||||
StartupRequestCode::Cancel => return Ok(()),
|
||||
}
|
||||
|
||||
loop {
|
||||
let msg = FeMessage::read_from(&mut self.stream_in)?;
|
||||
match msg {
|
||||
FeMessage::Query(q) => {
|
||||
trace!("got query {:?}", q.body);
|
||||
|
||||
if q.body.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system()?;
|
||||
} else if q.body.starts_with(b"START_REPLICATION") {
|
||||
// Create a new replication object, consuming `self`.
|
||||
ReplicationHandler::new(self).run(&q.body)?;
|
||||
break;
|
||||
} else {
|
||||
bail!("Unexpected command {:?}", q.body);
|
||||
}
|
||||
}
|
||||
FeMessage::Terminate => {
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
bail!("unexpected message");
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("WAL sender to {:?} is finished", peer_addr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self) -> Result<()> {
|
||||
let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
|
||||
let lsn = start_pos.to_string();
|
||||
let tli = timeline.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
|
||||
let mut outbuf = BytesMut::new();
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::RowDescription(&[
|
||||
RowDescriptor {
|
||||
name: b"systemid\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"timeline\0",
|
||||
typoid: 23,
|
||||
typlen: 4,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"xlogpos\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"dbname\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
]),
|
||||
);
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]),
|
||||
);
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"),
|
||||
);
|
||||
BeMessage::write(&mut outbuf, &BeMessage::ReadyForQuery);
|
||||
self.stream_out.write_all(&outbuf)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
use anyhow::{bail, Result};
|
||||
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use fs2::FileExt;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
@@ -24,6 +24,7 @@ use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use crate::pq_protocol::*;
|
||||
use crate::send_wal::SendWal;
|
||||
use crate::WalAcceptorConf;
|
||||
use pageserver::ZTimelineId;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
@@ -36,34 +37,31 @@ const SK_MAGIC: u32 = 0xCafeCeefu32;
|
||||
const SK_FORMAT_VERSION: u32 = 1;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
|
||||
const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */
|
||||
const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */
|
||||
const LIBPQ_MSG_SIZE_OFFS: usize = 1;
|
||||
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
|
||||
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
const END_OF_STREAM: Lsn = Lsn(0);
|
||||
|
||||
/// Unique node identifier used by Paxos
|
||||
#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct NodeId {
|
||||
pub struct NodeId {
|
||||
term: u64,
|
||||
uuid: [u8; 16],
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
struct ServerInfo {
|
||||
pub struct ServerInfo {
|
||||
/// proxy-safekeeper protocol version
|
||||
protocol_version: u32,
|
||||
pub protocol_version: u32,
|
||||
/// Postgres server version
|
||||
pg_version: u32,
|
||||
node_id: NodeId,
|
||||
system_id: SystemId,
|
||||
pub pg_version: u32,
|
||||
pub node_id: NodeId,
|
||||
pub system_id: SystemId,
|
||||
/// Zenith timelineid
|
||||
timeline_id: ZTimelineId,
|
||||
wal_end: Lsn,
|
||||
timeline: TimeLineID,
|
||||
wal_seg_size: u32,
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub wal_end: Lsn,
|
||||
pub timeline: TimeLineID,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
/// Vote request sent from proxy to safekeepers
|
||||
@@ -78,29 +76,29 @@ struct RequestVote {
|
||||
|
||||
/// Information of about storage node
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
struct SafeKeeperInfo {
|
||||
pub struct SafeKeeperInfo {
|
||||
/// magic for verifying content the control file
|
||||
magic: u32,
|
||||
pub magic: u32,
|
||||
/// safekeeper format version
|
||||
format_version: u32,
|
||||
pub format_version: u32,
|
||||
/// safekeeper's epoch
|
||||
epoch: u64,
|
||||
pub epoch: u64,
|
||||
/// information about server
|
||||
server: ServerInfo,
|
||||
pub server: ServerInfo,
|
||||
/// part of WAL acknowledged by quorum
|
||||
commit_lsn: Lsn,
|
||||
pub commit_lsn: Lsn,
|
||||
/// locally flushed part of WAL
|
||||
flush_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
|
||||
restart_lsn: Lsn,
|
||||
pub restart_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Hot standby feedback received from replica
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
|
||||
struct HotStandbyFeedback {
|
||||
ts: TimestampTz,
|
||||
xmin: FullTransactionId,
|
||||
catalog_xmin: FullTransactionId,
|
||||
pub struct HotStandbyFeedback {
|
||||
pub ts: TimestampTz,
|
||||
pub xmin: FullTransactionId,
|
||||
pub catalog_xmin: FullTransactionId,
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proxy to safekeeper.
|
||||
@@ -142,14 +140,14 @@ struct SharedState {
|
||||
/// Database instance (tenant)
|
||||
#[derive(Debug)]
|
||||
pub struct Timeline {
|
||||
timelineid: ZTimelineId,
|
||||
pub timelineid: ZTimelineId,
|
||||
mutex: Mutex<SharedState>,
|
||||
/// conditional variable used to notify wal senders
|
||||
cond: Condvar,
|
||||
}
|
||||
|
||||
// Useful utilities needed by various Connection-like objects
|
||||
trait TimelineTools {
|
||||
pub trait TimelineTools {
|
||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
fn get(&self) -> &Arc<Timeline>;
|
||||
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
|
||||
@@ -176,22 +174,19 @@ impl TimelineTools for Option<Arc<Timeline>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Private data
|
||||
#[derive(Debug)]
|
||||
pub struct Connection {
|
||||
timeline: Option<Arc<Timeline>>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
/// Postgres connection, buffered input
|
||||
stream_in: BufReader<TcpStream>,
|
||||
pub stream_in: BufReader<TcpStream>,
|
||||
/// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain.
|
||||
stream_out: TcpStream,
|
||||
pub stream_out: TcpStream,
|
||||
/// The cached result of socket.peer_addr()
|
||||
peer_addr: SocketAddr,
|
||||
/// input buffer
|
||||
//inbuf: BytesMut,
|
||||
pub peer_addr: SocketAddr,
|
||||
/// output buffer
|
||||
outbuf: BytesMut,
|
||||
/// wal acceptor configuration
|
||||
conf: WalAcceptorConf,
|
||||
pub conf: WalAcceptorConf,
|
||||
}
|
||||
|
||||
/// Serde adapter for BytesMut
|
||||
@@ -238,16 +233,6 @@ impl SafeKeeperInfo {
|
||||
}
|
||||
}
|
||||
|
||||
impl HotStandbyFeedback {
|
||||
fn parse(body: &Bytes) -> HotStandbyFeedback {
|
||||
HotStandbyFeedback {
|
||||
ts: BigEndian::read_u64(&body[0..8]),
|
||||
xmin: BigEndian::read_u64(&body[8..16]),
|
||||
catalog_xmin: BigEndian::read_u64(&body[16..24]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TIMELINES: Mutex<HashMap<ZTimelineId, Arc<Timeline>>> =
|
||||
Mutex::new(HashMap::new());
|
||||
@@ -328,6 +313,23 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for an LSN to be committed.
|
||||
///
|
||||
/// Returns the last committed LSN, which will be at least
|
||||
/// as high as the LSN waited for.
|
||||
///
|
||||
pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
loop {
|
||||
let commit_lsn = shared_state.commit_lsn;
|
||||
// This must be `>`, not `>=`.
|
||||
if commit_lsn > lsn {
|
||||
return commit_lsn;
|
||||
}
|
||||
shared_state = self.cond.wait(shared_state).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Notify caught-up WAL senders about new WAL data received
|
||||
fn notify_wal_senders(&self, commit_lsn: Lsn) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
@@ -341,7 +343,7 @@ impl Timeline {
|
||||
self.notify_wal_senders(END_REPLICATION_MARKER);
|
||||
}
|
||||
|
||||
fn get_info(&self) -> SafeKeeperInfo {
|
||||
pub fn get_info(&self) -> SafeKeeperInfo {
|
||||
return self.mutex.lock().unwrap().info;
|
||||
}
|
||||
|
||||
@@ -350,7 +352,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Accumulate hot standby feedbacks from replicas
|
||||
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
|
||||
pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.hs_feedback.xmin = min(shared_state.hs_feedback.xmin, feedback.xmin);
|
||||
shared_state.hs_feedback.catalog_xmin =
|
||||
@@ -466,10 +468,10 @@ impl Connection {
|
||||
}
|
||||
|
||||
fn run(mut self) -> Result<()> {
|
||||
// Peek at the first 4 bytes of the incoming data, to determine which protocol
|
||||
// is being spoken.
|
||||
// `fill_buf` does not consume any of the bytes we peek at; they are left
|
||||
// in the BufReader's internal buffer for the next reader.
|
||||
// Peek at the first 4 bytes of the incoming data, to determine which
|
||||
// protocol is being spoken. fill_buf` does not consume any of the
|
||||
// bytes we peek at; they are left in the BufReader's internal buffer
|
||||
// for the next reader.
|
||||
let peek_buf = self.stream_in.fill_buf()?;
|
||||
if peek_buf.len() < 4 {
|
||||
// Empty peek_buf means the socket was closed.
|
||||
@@ -483,7 +485,7 @@ impl Connection {
|
||||
self.stream_in.read_u32::<BigEndian>()?;
|
||||
self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor
|
||||
} else {
|
||||
send_wal::SendWal::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers
|
||||
SendWal::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -800,372 +802,3 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
mod send_wal {
|
||||
use super::{
|
||||
Connection, HotStandbyFeedback, Timeline, TimelineTools, END_REPLICATION_MARKER,
|
||||
LIBPQ_HDR_SIZE, LIBPQ_MSG_SIZE_OFFS, MAX_SEND_SIZE, XLOG_HDR_SIZE,
|
||||
};
|
||||
use crate::pq_protocol::{
|
||||
BeMessage, FeMessage, FeStartupMessage, RowDescriptor, StartupRequestCode,
|
||||
};
|
||||
use crate::WalAcceptorConf;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{get_current_timestamp, XLogFileName};
|
||||
use regex::Regex;
|
||||
use std::cmp::min;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{str, thread};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
pub struct SendWal {
|
||||
timeline: Option<Arc<Timeline>>,
|
||||
/// Postgres connection, buffered input
|
||||
stream_in: BufReader<TcpStream>,
|
||||
/// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain.
|
||||
stream_out: TcpStream,
|
||||
/// The cached result of socket.peer_addr()
|
||||
peer_addr: SocketAddr,
|
||||
/// wal acceptor configuration
|
||||
conf: WalAcceptorConf,
|
||||
/// assigned application name
|
||||
appname: Option<String>,
|
||||
}
|
||||
|
||||
impl SendWal {
|
||||
/// Create a new `SendWal`, consuming the `Connection`.
|
||||
pub fn new(conn: Connection) -> Self {
|
||||
Self {
|
||||
timeline: conn.timeline,
|
||||
stream_in: conn.stream_in,
|
||||
stream_out: conn.stream_out,
|
||||
peer_addr: conn.peer_addr,
|
||||
conf: conn.conf,
|
||||
appname: None,
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Send WAL to replica or WAL receiver using standard libpq replication protocol
|
||||
///
|
||||
pub fn run(mut self) -> Result<()> {
|
||||
let peer_addr = self.peer_addr.clone();
|
||||
info!("WAL sender to {:?} is started", peer_addr);
|
||||
|
||||
// Handle the startup message first.
|
||||
|
||||
let m = FeStartupMessage::read_from(&mut self.stream_in)?;
|
||||
trace!("got startup message {:?}", m);
|
||||
match m.kind {
|
||||
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
||||
let mut buf = BytesMut::new();
|
||||
BeMessage::write(&mut buf, &BeMessage::Negotiate);
|
||||
info!("SSL requested");
|
||||
self.stream_out.write_all(&buf)?;
|
||||
}
|
||||
StartupRequestCode::Normal => {
|
||||
let mut buf = BytesMut::new();
|
||||
BeMessage::write(&mut buf, &BeMessage::AuthenticationOk);
|
||||
BeMessage::write(&mut buf, &BeMessage::ReadyForQuery);
|
||||
self.stream_out.write_all(&buf)?;
|
||||
self.timeline.set(m.timelineid)?;
|
||||
self.appname = m.appname;
|
||||
}
|
||||
StartupRequestCode::Cancel => return Ok(()),
|
||||
}
|
||||
|
||||
loop {
|
||||
let msg = FeMessage::read_from(&mut self.stream_in)?;
|
||||
match msg {
|
||||
FeMessage::Query(q) => {
|
||||
trace!("got query {:?}", q.body);
|
||||
|
||||
if q.body.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system()?;
|
||||
} else if q.body.starts_with(b"START_REPLICATION") {
|
||||
// Create a new replication object, consuming `self`.
|
||||
let mut replication = ReplicationHandler::new(self);
|
||||
replication.run(&q.body)?;
|
||||
break;
|
||||
} else {
|
||||
bail!("Unexpected command {:?}", q.body);
|
||||
}
|
||||
}
|
||||
FeMessage::Terminate => {
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
bail!("unexpected message");
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("WAL sender to {:?} is finished", peer_addr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self) -> Result<()> {
|
||||
let (start_pos, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
|
||||
let lsn = start_pos.to_string();
|
||||
let tli = timeline.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
|
||||
let mut outbuf = BytesMut::new();
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::RowDescription(&[
|
||||
RowDescriptor {
|
||||
name: b"systemid\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"timeline\0",
|
||||
typoid: 23,
|
||||
typlen: 4,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"xlogpos\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"dbname\0",
|
||||
typoid: 25,
|
||||
typlen: -1,
|
||||
},
|
||||
]),
|
||||
);
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]),
|
||||
);
|
||||
BeMessage::write(
|
||||
&mut outbuf,
|
||||
&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"),
|
||||
);
|
||||
BeMessage::write(&mut outbuf, &BeMessage::ReadyForQuery);
|
||||
self.stream_out.write_all(&outbuf)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReplicationHandler {
|
||||
timeline: Option<Arc<Timeline>>,
|
||||
/// Postgres connection, buffered input
|
||||
stream_in: Option<BufReader<TcpStream>>,
|
||||
/// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain.
|
||||
stream_out: Mutex<TcpStream>,
|
||||
/// wal acceptor configuration
|
||||
conf: WalAcceptorConf,
|
||||
/// assigned application name
|
||||
appname: Option<String>,
|
||||
}
|
||||
|
||||
impl ReplicationHandler {
|
||||
/// Create a new `SendWal`, consuming the `Connection`.
|
||||
pub fn new(conn: SendWal) -> Self {
|
||||
Self {
|
||||
timeline: conn.timeline,
|
||||
stream_in: Some(conn.stream_in),
|
||||
stream_out: Mutex::new(conn.stream_out),
|
||||
conf: conn.conf,
|
||||
appname: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle incoming messages from the network.
|
||||
///
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
///
|
||||
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
|
||||
// Wait for replica's feedback.
|
||||
// We only handle `CopyData` messages. Anything else is ignored.
|
||||
|
||||
loop {
|
||||
match FeMessage::read_from(&mut stream_in)? {
|
||||
FeMessage::CopyData(m) => {
|
||||
timeline.add_hs_feedback(HotStandbyFeedback::parse(&m.body))
|
||||
}
|
||||
msg => {
|
||||
info!("unexpected message {:?}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function that parses a pair of LSNs.
|
||||
fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> {
|
||||
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
let caps = re.captures_iter(str::from_utf8(&cmd[..])?);
|
||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||
let start_pos = lsns
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("failed to find start LSN"))??;
|
||||
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0));
|
||||
Ok((start_pos, stop_pos))
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = File::open(&partial_path) {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
match File::open(&wal_file_path) {
|
||||
Ok(opened_file) => return Ok(opened_file),
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle START_REPLICATION replication command
|
||||
///
|
||||
fn run(&mut self, cmd: &Bytes) -> Result<()> {
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(self.timeline.get());
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline) {
|
||||
error!("socket error: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?;
|
||||
|
||||
let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
bail!("Can not start replication before connecting to wal_proposer");
|
||||
}
|
||||
let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
|
||||
if start_pos == Lsn(0) {
|
||||
start_pos = wal_end;
|
||||
}
|
||||
if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
stop_pos = wal_end;
|
||||
}
|
||||
info!("Start replication from {} till {}", start_pos, stop_pos);
|
||||
|
||||
let mut outbuf = BytesMut::new();
|
||||
BeMessage::write(&mut outbuf, &BeMessage::Copy);
|
||||
self.send(&outbuf)?;
|
||||
outbuf.clear();
|
||||
|
||||
let mut end_pos: Lsn;
|
||||
let mut commit_lsn: Lsn;
|
||||
let mut wal_file: Option<File> = None;
|
||||
|
||||
loop {
|
||||
/* Wait until we have some data to stream */
|
||||
if stop_pos != Lsn(0) {
|
||||
/* recovery mode: stream up to the specified LSN (VCL) */
|
||||
if start_pos >= stop_pos {
|
||||
/* recovery finished */
|
||||
break;
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* normal mode */
|
||||
let timeline = self.timeline.get();
|
||||
let mut shared_state = timeline.mutex.lock().unwrap();
|
||||
loop {
|
||||
commit_lsn = shared_state.commit_lsn;
|
||||
if start_pos < commit_lsn {
|
||||
end_pos = commit_lsn;
|
||||
break;
|
||||
}
|
||||
shared_state = timeline.cond.wait(shared_state).unwrap();
|
||||
}
|
||||
}
|
||||
if end_pos == END_REPLICATION_MARKER {
|
||||
break;
|
||||
}
|
||||
|
||||
// Take the `File` from `wal_file`, or open a new file.
|
||||
let mut file = match wal_file.take() {
|
||||
Some(file) => file,
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = start_pos.segment_number(wal_seg_size as u64);
|
||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||
let timeline_id = self.timeline.get().timelineid.to_string();
|
||||
let wal_file_path =
|
||||
self.conf.data_dir.join(timeline_id).join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
}
|
||||
};
|
||||
|
||||
let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than MAX_SEND_SIZE.
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, wal_seg_size - xlogoff);
|
||||
let send_size = min(send_size, MAX_SEND_SIZE);
|
||||
|
||||
let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size;
|
||||
|
||||
// Read some data from the file.
|
||||
let mut file_buf = vec![0u8; send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file.read_exact(&mut file_buf)?;
|
||||
|
||||
// Write some data to the network socket.
|
||||
// FIXME: turn these into structs.
|
||||
// 'd' is CopyData;
|
||||
// 'w' is "WAL records"
|
||||
// https://www.postgresql.org/docs/9.1/protocol-message-formats.html
|
||||
// src/backend/replication/walreceiver.c
|
||||
outbuf.clear();
|
||||
outbuf.put_u8(b'd');
|
||||
outbuf.put_u32((msg_size - LIBPQ_MSG_SIZE_OFFS) as u32);
|
||||
outbuf.put_u8(b'w');
|
||||
outbuf.put_u64(start_pos.0);
|
||||
outbuf.put_u64(end_pos.0);
|
||||
outbuf.put_u64(get_current_timestamp());
|
||||
|
||||
assert!(outbuf.len() + file_buf.len() == msg_size);
|
||||
// FIXME: combine these two into a single send,
|
||||
// so that no other traffic can be sent in between them.
|
||||
self.send(&outbuf)?;
|
||||
self.send(&file_buf)?;
|
||||
start_pos += send_size as u64;
|
||||
|
||||
debug!("Sent WAL to page server up to {}", end_pos);
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
if start_pos.segment_offset(wal_seg_size as u64) != 0 {
|
||||
wal_file = Some(file);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unlock the mutex and send bytes on the network.
|
||||
fn send(&self, buf: &[u8]) -> Result<()> {
|
||||
let mut writer = self.stream_out.lock().unwrap();
|
||||
writer.write_all(buf.as_ref())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user