mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Multitenant wal_acceptor
This commit is contained in:
@@ -34,7 +34,14 @@ fn main() -> Result<(), io::Error> {
|
|||||||
.short("l")
|
.short("l")
|
||||||
.long("listen")
|
.long("listen")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"),
|
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5454)"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("pageserver")
|
||||||
|
.short("p")
|
||||||
|
.long("pageserver")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("address ip:port of pageserver with which wal_acceptor should establish connection"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("daemonize")
|
Arg::with_name("daemonize")
|
||||||
@@ -56,6 +63,7 @@ fn main() -> Result<(), io::Error> {
|
|||||||
data_dir: PathBuf::from("./"),
|
data_dir: PathBuf::from("./"),
|
||||||
daemonize: false,
|
daemonize: false,
|
||||||
no_sync: false,
|
no_sync: false,
|
||||||
|
pageserver_addr: None,
|
||||||
listen_addr: "127.0.0.1:5454".parse().unwrap(),
|
listen_addr: "127.0.0.1:5454".parse().unwrap(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -75,6 +83,10 @@ fn main() -> Result<(), io::Error> {
|
|||||||
conf.listen_addr = addr.parse().unwrap();
|
conf.listen_addr = addr.parse().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(addr) = arg_matches.value_of("pageserver") {
|
||||||
|
conf.pageserver_addr = Some(addr.parse().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
start_wal_acceptor(conf)
|
start_wal_acceptor(conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,4 +13,5 @@ pub struct WalAcceptorConf {
|
|||||||
pub daemonize: bool,
|
pub daemonize: bool,
|
||||||
pub no_sync: bool,
|
pub no_sync: bool,
|
||||||
pub listen_addr: SocketAddr,
|
pub listen_addr: SocketAddr,
|
||||||
|
pub pageserver_addr: Option<SocketAddr>,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
use byteorder::{BigEndian, ByteOrder};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
pub type Oid = u32;
|
pub type Oid = u32;
|
||||||
|
pub type SystemId = u64;
|
||||||
pub type Result<T> = std::result::Result<T, io::Error>;
|
pub type Result<T> = std::result::Result<T, io::Error>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -35,6 +37,7 @@ pub enum BeMessage<'a> {
|
|||||||
pub struct FeStartupMessage {
|
pub struct FeStartupMessage {
|
||||||
pub version: u32,
|
pub version: u32,
|
||||||
pub kind: StartupRequestCode,
|
pub kind: StartupRequestCode,
|
||||||
|
pub system_id: SystemId
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -76,10 +79,30 @@ impl FeStartupMessage {
|
|||||||
_ => StartupRequestCode::Normal,
|
_ => StartupRequestCode::Normal,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let params_bytes = &buf[8..len];
|
||||||
|
let params_str = str::from_utf8(¶ms_bytes).unwrap();
|
||||||
|
let params = params_str.split('\0');
|
||||||
|
let mut options = false;
|
||||||
|
let mut system_id : u64 = 0;
|
||||||
|
for p in params {
|
||||||
|
if p == "options" {
|
||||||
|
options = true;
|
||||||
|
} else if options {
|
||||||
|
for opt in p.split(' ') {
|
||||||
|
if opt.starts_with("system.id=") {
|
||||||
|
system_id = opt[10..].parse::<u64>().unwrap();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
buf.advance(len as usize);
|
buf.advance(len as usize);
|
||||||
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
|
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
|
||||||
version,
|
version,
|
||||||
kind,
|
kind,
|
||||||
|
system_id,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,12 +21,14 @@ use std::io::prelude::*;
|
|||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::sync::Mutex;
|
use std::sync::{Mutex,Arc};
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::runtime;
|
use tokio::runtime;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use tokio_postgres::{connect, NoTls, Error};
|
||||||
|
|
||||||
use crate::pq_protocol::*;
|
use crate::pq_protocol::*;
|
||||||
use crate::xlog_utils::*;
|
use crate::xlog_utils::*;
|
||||||
@@ -62,7 +64,7 @@ struct ServerInfo {
|
|||||||
protocol_version: u32, /* proxy-safekeeper protocol version */
|
protocol_version: u32, /* proxy-safekeeper protocol version */
|
||||||
pg_version: u32, /* Postgres server version */
|
pg_version: u32, /* Postgres server version */
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
system_id: u64, /* Postgres system identifier */
|
system_id: SystemId, /* Postgres system identifier */
|
||||||
wal_end: XLogRecPtr,
|
wal_end: XLogRecPtr,
|
||||||
timeline: TimeLineID,
|
timeline: TimeLineID,
|
||||||
wal_seg_size: u32,
|
wal_seg_size: u32,
|
||||||
@@ -130,7 +132,7 @@ struct SafeKeeperResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* State shared by safekeeper tasks and protected by mutex
|
* Shared state associated with database instance (tenant)
|
||||||
*/
|
*/
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct SharedState {
|
struct SharedState {
|
||||||
@@ -141,12 +143,13 @@ struct SharedState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Static data
|
* Database instance (tenant)
|
||||||
*/
|
*/
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct WalAcceptor {
|
pub struct System {
|
||||||
mutex: Mutex<SharedState>, /* mutext for protecting shared state */
|
id : SystemId,
|
||||||
cond: Notify, /* conditional variable used to notify wal senders */
|
mutex : Mutex<SharedState>,
|
||||||
|
cond: Notify, /* conditional variable used to notify wal senders */
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -154,7 +157,7 @@ pub struct WalAcceptor {
|
|||||||
*/
|
*/
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Connection {
|
struct Connection {
|
||||||
acceptor: &'static WalAcceptor,
|
system: Option<Arc<System>>,
|
||||||
stream: TcpStream, /* Postgres connection */
|
stream: TcpStream, /* Postgres connection */
|
||||||
inbuf: BytesMut, /* input buffer */
|
inbuf: BytesMut, /* input buffer */
|
||||||
outbuf: BytesMut, /* output buffer */
|
outbuf: BytesMut, /* output buffer */
|
||||||
@@ -346,7 +349,7 @@ impl Serializer for SafeKeeperResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
pub static ref SELF: WalAcceptor = WalAcceptor::new();
|
pub static ref SYSTEMS: Mutex<HashMap<SystemId, Arc<System>>> = Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn thread_main(conf: WalAcceptorConf) {
|
pub fn thread_main(conf: WalAcceptorConf) {
|
||||||
@@ -362,15 +365,33 @@ pub fn thread_main(conf: WalAcceptorConf) {
|
|||||||
|
|
||||||
info!("Starting wal acceptor on {}", conf.listen_addr);
|
info!("Starting wal acceptor on {}", conf.listen_addr);
|
||||||
|
|
||||||
SELF.load_control_file(&conf);
|
|
||||||
|
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
let _unused = SELF.main_loop(&conf).await;
|
let _unused = main_loop(&conf).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WalAcceptor {
|
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
|
||||||
pub fn new() -> WalAcceptor {
|
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((socket, peer_addr)) => {
|
||||||
|
debug!("accepted connection from {}", peer_addr);
|
||||||
|
socket.set_nodelay(true)?;
|
||||||
|
let mut conn = Connection::new(socket, &conf);
|
||||||
|
task::spawn(async move {
|
||||||
|
if let Err(err) = conn.run().await {
|
||||||
|
error!("error: {}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => error!("Failed to accept connection: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl System {
|
||||||
|
pub fn new(id : SystemId) -> System {
|
||||||
let shared_state = SharedState {
|
let shared_state = SharedState {
|
||||||
commit_lsn: 0,
|
commit_lsn: 0,
|
||||||
info: SafeKeeperInfo::new(),
|
info: SafeKeeperInfo::new(),
|
||||||
@@ -381,7 +402,8 @@ impl WalAcceptor {
|
|||||||
catalog_xmin: u64::MAX,
|
catalog_xmin: u64::MAX,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
WalAcceptor {
|
System {
|
||||||
|
id: id,
|
||||||
mutex: Mutex::new(shared_state),
|
mutex: Mutex::new(shared_state),
|
||||||
cond: Notify::new(),
|
cond: Notify::new(),
|
||||||
}
|
}
|
||||||
@@ -424,7 +446,9 @@ impl WalAcceptor {
|
|||||||
|
|
||||||
// Load and lock control file (prevent running more than one instane of safekeeper */
|
// Load and lock control file (prevent running more than one instane of safekeeper */
|
||||||
fn load_control_file(&self, conf: &WalAcceptorConf) {
|
fn load_control_file(&self, conf: &WalAcceptorConf) {
|
||||||
let control_file_path = conf.data_dir.join(CONTROL_FILE_NAME);
|
let control_file_path = conf.data_dir
|
||||||
|
.join(self.id.to_string())
|
||||||
|
.join(CONTROL_FILE_NAME);
|
||||||
match OpenOptions::new()
|
match OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
.write(true)
|
.write(true)
|
||||||
@@ -492,35 +516,15 @@ impl WalAcceptor {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn main_loop(&'static self, conf: &WalAcceptorConf) -> Result<()> {
|
|
||||||
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
|
|
||||||
loop {
|
|
||||||
match listener.accept().await {
|
|
||||||
Ok((socket, peer_addr)) => {
|
|
||||||
debug!("accepted connection from {}", peer_addr);
|
|
||||||
socket.set_nodelay(true)?;
|
|
||||||
let mut conn = Connection::new(self, socket, &conf);
|
|
||||||
task::spawn(async move {
|
|
||||||
if let Err(err) = conn.run().await {
|
|
||||||
error!("error: {}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(e) => error!("Failed to accept connection: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Connection {
|
impl Connection {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
acceptor: &'static WalAcceptor,
|
|
||||||
socket: TcpStream,
|
socket: TcpStream,
|
||||||
conf: &WalAcceptorConf,
|
conf: &WalAcceptorConf,
|
||||||
) -> Connection {
|
) -> Connection {
|
||||||
Connection {
|
Connection {
|
||||||
acceptor: acceptor,
|
system: None,
|
||||||
stream: socket,
|
stream: socket,
|
||||||
inbuf: BytesMut::with_capacity(10 * 1024),
|
inbuf: BytesMut::with_capacity(10 * 1024),
|
||||||
outbuf: BytesMut::with_capacity(10 * 1024),
|
outbuf: BytesMut::with_capacity(10 * 1024),
|
||||||
@@ -529,6 +533,10 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn system(&self) -> Arc<System> {
|
||||||
|
self.system.as_ref().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
async fn run(&mut self) -> Result<()> {
|
async fn run(&mut self) -> Result<()> {
|
||||||
self.inbuf.resize(4, 0u8);
|
self.inbuf.resize(4, 0u8);
|
||||||
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
|
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
|
||||||
@@ -548,15 +556,65 @@ impl Connection {
|
|||||||
Ok(T::unpack(&mut self.inbuf))
|
Ok(T::unpack(&mut self.inbuf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn request_callback(&self) -> std::result::Result<(), Error> {
|
||||||
|
if let Some(addr) = self.conf.pageserver_addr {
|
||||||
|
let ps_connstr = format!(
|
||||||
|
"host={} port={} dbname={} user={}",
|
||||||
|
addr.ip(),
|
||||||
|
addr.port(),
|
||||||
|
"no_db",
|
||||||
|
"no_user",
|
||||||
|
);
|
||||||
|
let callme = format!(
|
||||||
|
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
|
||||||
|
self.conf.listen_addr.ip(),
|
||||||
|
self.conf.listen_addr.port(),
|
||||||
|
self.system().get_info().server.system_id,
|
||||||
|
);
|
||||||
|
let (client, connection) = connect(&ps_connstr, NoTls).await?;
|
||||||
|
|
||||||
|
// The connection object performs the actual communication with the database,
|
||||||
|
// so spawn it off to run on its own.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = connection.await {
|
||||||
|
error!("pageserver connection error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
client.simple_query(&callme).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_system(&mut self, id: SystemId) -> Result<()> {
|
||||||
|
let mut systems = SYSTEMS.lock().unwrap();
|
||||||
|
if id == 0 { // non-multitenant configuration: just sigle instance
|
||||||
|
if let Some(system) = systems.values().next() {
|
||||||
|
self.system = Some(system.clone());
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
io_error!("No active instances");
|
||||||
|
}
|
||||||
|
if !systems.contains_key(&id) {
|
||||||
|
let system_dir = self.conf.data_dir.join(id.to_string());
|
||||||
|
fs::create_dir_all(system_dir)?;
|
||||||
|
systems.insert(id, Arc::new(System::new(id)));
|
||||||
|
}
|
||||||
|
self.system = Some(systems.get(&id).unwrap().clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// Receive WAL from wal_proposer
|
// Receive WAL from wal_proposer
|
||||||
async fn receive_wal(&mut self) -> Result<()> {
|
async fn receive_wal(&mut self) -> Result<()> {
|
||||||
let mut my_info = self.acceptor.get_info();
|
|
||||||
// Receive information about server
|
// Receive information about server
|
||||||
let server_info = self.read_req::<ServerInfo>().await?;
|
let server_info = self.read_req::<ServerInfo>().await?;
|
||||||
info!(
|
info!(
|
||||||
"Start handshake with wal_proposer {}",
|
"Start handshake with wal_proposer {} sysid {}",
|
||||||
self.stream.peer_addr()?
|
self.stream.peer_addr()?, server_info.system_id
|
||||||
);
|
);
|
||||||
|
self.set_system(server_info.system_id)?;
|
||||||
|
self.system().load_control_file(&self.conf);
|
||||||
|
|
||||||
|
let mut my_info = self.system().get_info();
|
||||||
|
|
||||||
/* Check protocol compatibility */
|
/* Check protocol compatibility */
|
||||||
if server_info.protocol_version != SK_PROTOCOL_VERSION {
|
if server_info.protocol_version != SK_PROTOCOL_VERSION {
|
||||||
@@ -605,9 +663,9 @@ impl Connection {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
my_info.server.node_id = prop.node_id;
|
my_info.server.node_id = prop.node_id;
|
||||||
self.acceptor.set_info(&my_info);
|
self.system().set_info(&my_info);
|
||||||
/* Need to persist our vote first */
|
/* Need to persist our vote first */
|
||||||
self.acceptor.save_control_file(true)?;
|
self.system().save_control_file(true)?;
|
||||||
|
|
||||||
let mut flushed_restart_lsn: XLogRecPtr = 0;
|
let mut flushed_restart_lsn: XLogRecPtr = 0;
|
||||||
let wal_seg_size = server_info.wal_seg_size as usize;
|
let wal_seg_size = server_info.wal_seg_size as usize;
|
||||||
@@ -617,6 +675,13 @@ impl Connection {
|
|||||||
prop.node_id.pack(&mut self.outbuf);
|
prop.node_id.pack(&mut self.outbuf);
|
||||||
self.send().await?;
|
self.send().await?;
|
||||||
|
|
||||||
|
// Need to establish replication channel with page server.
|
||||||
|
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||||
|
if let Err(e) = self.request_callback().await {
|
||||||
|
// Do not treate it as fatal error and continue work
|
||||||
|
error!("Failed to send callme request to pageserver: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Start streaming from server {} address {:?}",
|
"Start streaming from server {} address {:?}",
|
||||||
server_info.system_id,
|
server_info.system_id,
|
||||||
@@ -671,7 +736,7 @@ impl Connection {
|
|||||||
* when restart_lsn delta exceeds WAL segment size.
|
* when restart_lsn delta exceeds WAL segment size.
|
||||||
*/
|
*/
|
||||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||||
self.acceptor.save_control_file(sync_control_file)?;
|
self.system().save_control_file(sync_control_file)?;
|
||||||
|
|
||||||
if sync_control_file {
|
if sync_control_file {
|
||||||
flushed_restart_lsn = my_info.restart_lsn;
|
flushed_restart_lsn = my_info.restart_lsn;
|
||||||
@@ -682,7 +747,7 @@ impl Connection {
|
|||||||
let resp = SafeKeeperResponse {
|
let resp = SafeKeeperResponse {
|
||||||
epoch: my_info.epoch,
|
epoch: my_info.epoch,
|
||||||
flush_lsn: end_pos,
|
flush_lsn: end_pos,
|
||||||
hs_feedback: self.acceptor.get_hs_feedback(),
|
hs_feedback: self.system().get_hs_feedback(),
|
||||||
};
|
};
|
||||||
self.start_sending();
|
self.start_sending();
|
||||||
resp.pack(&mut self.outbuf);
|
resp.pack(&mut self.outbuf);
|
||||||
@@ -692,8 +757,7 @@ impl Connection {
|
|||||||
* Ping wal sender that new data is available.
|
* Ping wal sender that new data is available.
|
||||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||||
*/
|
*/
|
||||||
self.acceptor
|
self.system().notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -711,10 +775,7 @@ impl Connection {
|
|||||||
if self.inbuf.is_empty() {
|
if self.inbuf.is_empty() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
} else {
|
} else {
|
||||||
return Err(io::Error::new(
|
io_error!("connection reset by peer");
|
||||||
io::ErrorKind::Other,
|
|
||||||
"connection reset by peer",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -767,6 +828,7 @@ impl Connection {
|
|||||||
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
|
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
|
||||||
self.send().await?;
|
self.send().await?;
|
||||||
self.init_done = true;
|
self.init_done = true;
|
||||||
|
self.set_system(m.system_id)?;
|
||||||
}
|
}
|
||||||
StartupRequestCode::Cancel => return Ok(()),
|
StartupRequestCode::Cancel => return Ok(()),
|
||||||
}
|
}
|
||||||
@@ -784,7 +846,7 @@ impl Connection {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
return Err(io::Error::new(io::ErrorKind::Other, "unexpected message"));
|
io_error!("unexpected message");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -799,7 +861,7 @@ impl Connection {
|
|||||||
let (start_pos, timeline) = self.find_end_of_wal(false);
|
let (start_pos, timeline) = self.find_end_of_wal(false);
|
||||||
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
|
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
|
||||||
let tli = timeline.to_string();
|
let tli = timeline.to_string();
|
||||||
let sysid = self.acceptor.get_info().server.system_id.to_string();
|
let sysid = self.system().get_info().server.system_id.to_string();
|
||||||
let lsn_bytes = lsn.as_bytes();
|
let lsn_bytes = lsn.as_bytes();
|
||||||
let tli_bytes = tli.as_bytes();
|
let tli_bytes = tli.as_bytes();
|
||||||
let sysid_bytes = sysid.as_bytes();
|
let sysid_bytes = sysid.as_bytes();
|
||||||
@@ -855,7 +917,7 @@ impl Connection {
|
|||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
let wal_seg_size = self.acceptor.get_info().server.wal_seg_size as usize;
|
let wal_seg_size = self.system().get_info().server.wal_seg_size as usize;
|
||||||
if wal_seg_size == 0 {
|
if wal_seg_size == 0 {
|
||||||
io_error!("Can not start replication before connecting to wal_proposer");
|
io_error!("Can not start replication before connecting to wal_proposer");
|
||||||
}
|
}
|
||||||
@@ -896,9 +958,10 @@ impl Connection {
|
|||||||
/* normal mode */
|
/* normal mode */
|
||||||
loop {
|
loop {
|
||||||
// Rust doesn't allow to grab async result from mutex scope
|
// Rust doesn't allow to grab async result from mutex scope
|
||||||
let notified = self.acceptor.cond.notified();
|
let system = self.system();
|
||||||
|
let notified = system.cond.notified();
|
||||||
{
|
{
|
||||||
let shared_state = self.acceptor.mutex.lock().unwrap();
|
let shared_state = system.mutex.lock().unwrap();
|
||||||
commit_lsn = shared_state.commit_lsn;
|
commit_lsn = shared_state.commit_lsn;
|
||||||
if start_pos < commit_lsn {
|
if start_pos < commit_lsn {
|
||||||
end_pos = commit_lsn;
|
end_pos = commit_lsn;
|
||||||
@@ -916,7 +979,7 @@ impl Connection {
|
|||||||
Ok(0) => break,
|
Ok(0) => break,
|
||||||
Ok(_) => match self.parse_message()? {
|
Ok(_) => match self.parse_message()? {
|
||||||
Some(FeMessage::CopyData(m)) => self
|
Some(FeMessage::CopyData(m)) => self
|
||||||
.acceptor
|
.system()
|
||||||
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
|
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
|
||||||
_ => {}
|
_ => {}
|
||||||
},
|
},
|
||||||
@@ -934,11 +997,15 @@ impl Connection {
|
|||||||
} else {
|
} else {
|
||||||
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
||||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||||
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
|
let wal_file_path = self.conf.data_dir
|
||||||
|
.join(self.system().id.to_string())
|
||||||
|
.join(wal_file_name.clone() + ".partial");
|
||||||
if let Ok(opened_file) = File::open(&wal_file_path) {
|
if let Ok(opened_file) = File::open(&wal_file_path) {
|
||||||
file = opened_file;
|
file = opened_file;
|
||||||
} else {
|
} else {
|
||||||
let wal_file_path = self.conf.data_dir.join(wal_file_name);
|
let wal_file_path = self.conf.data_dir
|
||||||
|
.join(self.system().id.to_string())
|
||||||
|
.join(wal_file_name);
|
||||||
match File::open(&wal_file_path) {
|
match File::open(&wal_file_path) {
|
||||||
Ok(opened_file) => file = opened_file,
|
Ok(opened_file) => file = opened_file,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -1017,8 +1084,12 @@ impl Connection {
|
|||||||
/* Open file */
|
/* Open file */
|
||||||
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
||||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||||
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone());
|
let wal_file_path = self.conf.data_dir
|
||||||
let wal_file_partial_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
|
.join(self.system().id.to_string())
|
||||||
|
.join(wal_file_name.clone());
|
||||||
|
let wal_file_partial_path = self.conf.data_dir
|
||||||
|
.join(self.system().id.to_string())
|
||||||
|
.join(wal_file_name.clone() + ".partial");
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut wal_file: File;
|
let mut wal_file: File;
|
||||||
@@ -1080,7 +1151,7 @@ impl Connection {
|
|||||||
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
|
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
|
||||||
find_end_of_wal(
|
find_end_of_wal(
|
||||||
&self.conf.data_dir,
|
&self.conf.data_dir,
|
||||||
self.acceptor.get_info().server.wal_seg_size as usize,
|
self.system().get_info().server.wal_seg_size as usize,
|
||||||
precise,
|
precise,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user