Multitenant wal_acceptor

This commit is contained in:
Konstantin Knizhnik
2021-04-07 13:43:40 +03:00
parent 8547184830
commit 3fea78d688
4 changed files with 169 additions and 62 deletions

View File

@@ -34,7 +34,14 @@ fn main() -> Result<(), io::Error> {
.short("l") .short("l")
.long("listen") .long("listen")
.takes_value(true) .takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"), .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
.short("p")
.long("pageserver")
.takes_value(true)
.help("address ip:port of pageserver with which wal_acceptor should establish connection"),
) )
.arg( .arg(
Arg::with_name("daemonize") Arg::with_name("daemonize")
@@ -56,6 +63,7 @@ fn main() -> Result<(), io::Error> {
data_dir: PathBuf::from("./"), data_dir: PathBuf::from("./"),
daemonize: false, daemonize: false,
no_sync: false, no_sync: false,
pageserver_addr: None,
listen_addr: "127.0.0.1:5454".parse().unwrap(), listen_addr: "127.0.0.1:5454".parse().unwrap(),
}; };
@@ -75,6 +83,10 @@ fn main() -> Result<(), io::Error> {
conf.listen_addr = addr.parse().unwrap(); conf.listen_addr = addr.parse().unwrap();
} }
if let Some(addr) = arg_matches.value_of("pageserver") {
conf.pageserver_addr = Some(addr.parse().unwrap());
}
start_wal_acceptor(conf) start_wal_acceptor(conf)
} }

View File

@@ -13,4 +13,5 @@ pub struct WalAcceptorConf {
pub daemonize: bool, pub daemonize: bool,
pub no_sync: bool, pub no_sync: bool,
pub listen_addr: SocketAddr, pub listen_addr: SocketAddr,
pub pageserver_addr: Option<SocketAddr>,
} }

View File

@@ -1,8 +1,10 @@
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io; use std::io;
use std::str;
pub type Oid = u32; pub type Oid = u32;
pub type SystemId = u64;
pub type Result<T> = std::result::Result<T, io::Error>; pub type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)] #[derive(Debug)]
@@ -35,6 +37,7 @@ pub enum BeMessage<'a> {
pub struct FeStartupMessage { pub struct FeStartupMessage {
pub version: u32, pub version: u32,
pub kind: StartupRequestCode, pub kind: StartupRequestCode,
pub system_id: SystemId
} }
#[derive(Debug)] #[derive(Debug)]
@@ -76,10 +79,30 @@ impl FeStartupMessage {
_ => StartupRequestCode::Normal, _ => StartupRequestCode::Normal,
}; };
let params_bytes = &buf[8..len];
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
let mut system_id : u64 = 0;
for p in params {
if p == "options" {
options = true;
} else if options {
for opt in p.split(' ') {
if opt.starts_with("system.id=") {
system_id = opt[10..].parse::<u64>().unwrap();
break;
}
}
break;
}
}
buf.advance(len as usize); buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage { Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version, version,
kind, kind,
system_id,
}))) })))
} }
} }

View File

@@ -21,12 +21,14 @@ use std::io::prelude::*;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::mem; use std::mem;
use std::str; use std::str;
use std::sync::Mutex; use std::sync::{Mutex,Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::runtime; use tokio::runtime;
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio::task; use tokio::task;
use std::collections::HashMap;
use tokio_postgres::{connect, NoTls, Error};
use crate::pq_protocol::*; use crate::pq_protocol::*;
use crate::xlog_utils::*; use crate::xlog_utils::*;
@@ -62,7 +64,7 @@ struct ServerInfo {
protocol_version: u32, /* proxy-safekeeper protocol version */ protocol_version: u32, /* proxy-safekeeper protocol version */
pg_version: u32, /* Postgres server version */ pg_version: u32, /* Postgres server version */
node_id: NodeId, node_id: NodeId,
system_id: u64, /* Postgres system identifier */ system_id: SystemId, /* Postgres system identifier */
wal_end: XLogRecPtr, wal_end: XLogRecPtr,
timeline: TimeLineID, timeline: TimeLineID,
wal_seg_size: u32, wal_seg_size: u32,
@@ -130,7 +132,7 @@ struct SafeKeeperResponse {
} }
/* /*
* State shared by safekeeper tasks and protected by mutex * Shared state associated with database instance (tenant)
*/ */
#[derive(Debug)] #[derive(Debug)]
struct SharedState { struct SharedState {
@@ -141,12 +143,13 @@ struct SharedState {
} }
/* /*
* Static data * Database instance (tenant)
*/ */
#[derive(Debug)] #[derive(Debug)]
pub struct WalAcceptor { pub struct System {
mutex: Mutex<SharedState>, /* mutext for protecting shared state */ id : SystemId,
cond: Notify, /* conditional variable used to notify wal senders */ mutex : Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
} }
/* /*
@@ -154,7 +157,7 @@ pub struct WalAcceptor {
*/ */
#[derive(Debug)] #[derive(Debug)]
struct Connection { struct Connection {
acceptor: &'static WalAcceptor, system: Option<Arc<System>>,
stream: TcpStream, /* Postgres connection */ stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */ inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */ outbuf: BytesMut, /* output buffer */
@@ -346,7 +349,7 @@ impl Serializer for SafeKeeperResponse {
} }
lazy_static! { lazy_static! {
pub static ref SELF: WalAcceptor = WalAcceptor::new(); pub static ref SYSTEMS: Mutex<HashMap<SystemId, Arc<System>>> = Mutex::new(HashMap::new());
} }
pub fn thread_main(conf: WalAcceptorConf) { pub fn thread_main(conf: WalAcceptorConf) {
@@ -362,15 +365,33 @@ pub fn thread_main(conf: WalAcceptorConf) {
info!("Starting wal acceptor on {}", conf.listen_addr); info!("Starting wal acceptor on {}", conf.listen_addr);
SELF.load_control_file(&conf);
runtime.block_on(async { runtime.block_on(async {
let _unused = SELF.main_loop(&conf).await; let _unused = main_loop(&conf).await;
}); });
} }
impl WalAcceptor { async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
pub fn new() -> WalAcceptor { let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
error!("error: {}", err);
}
});
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
}
impl System {
pub fn new(id : SystemId) -> System {
let shared_state = SharedState { let shared_state = SharedState {
commit_lsn: 0, commit_lsn: 0,
info: SafeKeeperInfo::new(), info: SafeKeeperInfo::new(),
@@ -381,7 +402,8 @@ impl WalAcceptor {
catalog_xmin: u64::MAX, catalog_xmin: u64::MAX,
}, },
}; };
WalAcceptor { System {
id: id,
mutex: Mutex::new(shared_state), mutex: Mutex::new(shared_state),
cond: Notify::new(), cond: Notify::new(),
} }
@@ -424,7 +446,9 @@ impl WalAcceptor {
// Load and lock control file (prevent running more than one instane of safekeeper */ // Load and lock control file (prevent running more than one instane of safekeeper */
fn load_control_file(&self, conf: &WalAcceptorConf) { fn load_control_file(&self, conf: &WalAcceptorConf) {
let control_file_path = conf.data_dir.join(CONTROL_FILE_NAME); let control_file_path = conf.data_dir
.join(self.id.to_string())
.join(CONTROL_FILE_NAME);
match OpenOptions::new() match OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
@@ -492,35 +516,15 @@ impl WalAcceptor {
} }
Ok(()) Ok(())
} }
async fn main_loop(&'static self, conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(self, socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
error!("error: {}", err);
}
});
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
}
} }
impl Connection { impl Connection {
pub fn new( pub fn new(
acceptor: &'static WalAcceptor,
socket: TcpStream, socket: TcpStream,
conf: &WalAcceptorConf, conf: &WalAcceptorConf,
) -> Connection { ) -> Connection {
Connection { Connection {
acceptor: acceptor, system: None,
stream: socket, stream: socket,
inbuf: BytesMut::with_capacity(10 * 1024), inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024),
@@ -529,6 +533,10 @@ impl Connection {
} }
} }
fn system(&self) -> Arc<System> {
self.system.as_ref().unwrap().clone()
}
async fn run(&mut self) -> Result<()> { async fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8); self.inbuf.resize(4, 0u8);
self.stream.read_exact(&mut self.inbuf[0..4]).await?; self.stream.read_exact(&mut self.inbuf[0..4]).await?;
@@ -548,15 +556,65 @@ impl Connection {
Ok(T::unpack(&mut self.inbuf)) Ok(T::unpack(&mut self.inbuf))
} }
async fn request_callback(&self) -> std::result::Result<(), Error> {
if let Some(addr) = self.conf.pageserver_addr {
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let callme = format!(
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
self.conf.listen_addr.ip(),
self.conf.listen_addr.port(),
self.system().get_info().server.system_id,
);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("pageserver connection error: {}", e);
}
});
client.simple_query(&callme).await?;
}
Ok(())
}
fn set_system(&mut self, id: SystemId) -> Result<()> {
let mut systems = SYSTEMS.lock().unwrap();
if id == 0 { // non-multitenant configuration: just sigle instance
if let Some(system) = systems.values().next() {
self.system = Some(system.clone());
return Ok(());
}
io_error!("No active instances");
}
if !systems.contains_key(&id) {
let system_dir = self.conf.data_dir.join(id.to_string());
fs::create_dir_all(system_dir)?;
systems.insert(id, Arc::new(System::new(id)));
}
self.system = Some(systems.get(&id).unwrap().clone());
Ok(())
}
// Receive WAL from wal_proposer // Receive WAL from wal_proposer
async fn receive_wal(&mut self) -> Result<()> { async fn receive_wal(&mut self) -> Result<()> {
let mut my_info = self.acceptor.get_info();
// Receive information about server // Receive information about server
let server_info = self.read_req::<ServerInfo>().await?; let server_info = self.read_req::<ServerInfo>().await?;
info!( info!(
"Start handshake with wal_proposer {}", "Start handshake with wal_proposer {} sysid {}",
self.stream.peer_addr()? self.stream.peer_addr()?, server_info.system_id
); );
self.set_system(server_info.system_id)?;
self.system().load_control_file(&self.conf);
let mut my_info = self.system().get_info();
/* Check protocol compatibility */ /* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION { if server_info.protocol_version != SK_PROTOCOL_VERSION {
@@ -605,9 +663,9 @@ impl Connection {
); );
} }
my_info.server.node_id = prop.node_id; my_info.server.node_id = prop.node_id;
self.acceptor.set_info(&my_info); self.system().set_info(&my_info);
/* Need to persist our vote first */ /* Need to persist our vote first */
self.acceptor.save_control_file(true)?; self.system().save_control_file(true)?;
let mut flushed_restart_lsn: XLogRecPtr = 0; let mut flushed_restart_lsn: XLogRecPtr = 0;
let wal_seg_size = server_info.wal_seg_size as usize; let wal_seg_size = server_info.wal_seg_size as usize;
@@ -617,6 +675,13 @@ impl Connection {
prop.node_id.pack(&mut self.outbuf); prop.node_id.pack(&mut self.outbuf);
self.send().await?; self.send().await?;
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
// Do not treate it as fatal error and continue work
error!("Failed to send callme request to pageserver: {}", e);
}
info!( info!(
"Start streaming from server {} address {:?}", "Start streaming from server {} address {:?}",
server_info.system_id, server_info.system_id,
@@ -671,7 +736,7 @@ impl Connection {
* when restart_lsn delta exceeds WAL segment size. * when restart_lsn delta exceeds WAL segment size.
*/ */
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
self.acceptor.save_control_file(sync_control_file)?; self.system().save_control_file(sync_control_file)?;
if sync_control_file { if sync_control_file {
flushed_restart_lsn = my_info.restart_lsn; flushed_restart_lsn = my_info.restart_lsn;
@@ -682,7 +747,7 @@ impl Connection {
let resp = SafeKeeperResponse { let resp = SafeKeeperResponse {
epoch: my_info.epoch, epoch: my_info.epoch,
flush_lsn: end_pos, flush_lsn: end_pos,
hs_feedback: self.acceptor.get_hs_feedback(), hs_feedback: self.system().get_hs_feedback(),
}; };
self.start_sending(); self.start_sending();
resp.pack(&mut self.outbuf); resp.pack(&mut self.outbuf);
@@ -692,8 +757,7 @@ impl Connection {
* Ping wal sender that new data is available. * Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/ */
self.acceptor self.system().notify_wal_senders(min(req.commit_lsn, end_pos));
.notify_wal_senders(min(req.commit_lsn, end_pos));
} }
Ok(()) Ok(())
} }
@@ -711,10 +775,7 @@ impl Connection {
if self.inbuf.is_empty() { if self.inbuf.is_empty() {
return Ok(None); return Ok(None);
} else { } else {
return Err(io::Error::new( io_error!("connection reset by peer");
io::ErrorKind::Other,
"connection reset by peer",
));
} }
} }
} }
@@ -767,6 +828,7 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?; self.send().await?;
self.init_done = true; self.init_done = true;
self.set_system(m.system_id)?;
} }
StartupRequestCode::Cancel => return Ok(()), StartupRequestCode::Cancel => return Ok(()),
} }
@@ -784,7 +846,7 @@ impl Connection {
break; break;
} }
_ => { _ => {
return Err(io::Error::new(io::ErrorKind::Other, "unexpected message")); io_error!("unexpected message");
} }
} }
} }
@@ -799,7 +861,7 @@ impl Connection {
let (start_pos, timeline) = self.find_end_of_wal(false); let (start_pos, timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32); let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
let tli = timeline.to_string(); let tli = timeline.to_string();
let sysid = self.acceptor.get_info().server.system_id.to_string(); let sysid = self.system().get_info().server.system_id.to_string();
let lsn_bytes = lsn.as_bytes(); let lsn_bytes = lsn.as_bytes();
let tli_bytes = tli.as_bytes(); let tli_bytes = tli.as_bytes();
let sysid_bytes = sysid.as_bytes(); let sysid_bytes = sysid.as_bytes();
@@ -855,7 +917,7 @@ impl Connection {
} else { } else {
0 0
}; };
let wal_seg_size = self.acceptor.get_info().server.wal_seg_size as usize; let wal_seg_size = self.system().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 { if wal_seg_size == 0 {
io_error!("Can not start replication before connecting to wal_proposer"); io_error!("Can not start replication before connecting to wal_proposer");
} }
@@ -896,9 +958,10 @@ impl Connection {
/* normal mode */ /* normal mode */
loop { loop {
// Rust doesn't allow to grab async result from mutex scope // Rust doesn't allow to grab async result from mutex scope
let notified = self.acceptor.cond.notified(); let system = self.system();
let notified = system.cond.notified();
{ {
let shared_state = self.acceptor.mutex.lock().unwrap(); let shared_state = system.mutex.lock().unwrap();
commit_lsn = shared_state.commit_lsn; commit_lsn = shared_state.commit_lsn;
if start_pos < commit_lsn { if start_pos < commit_lsn {
end_pos = commit_lsn; end_pos = commit_lsn;
@@ -916,7 +979,7 @@ impl Connection {
Ok(0) => break, Ok(0) => break,
Ok(_) => match self.parse_message()? { Ok(_) => match self.parse_message()? {
Some(FeMessage::CopyData(m)) => self Some(FeMessage::CopyData(m)) => self
.acceptor .system()
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)), .add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
_ => {} _ => {}
}, },
@@ -934,11 +997,15 @@ impl Connection {
} else { } else {
let segno = XLByteToSeg(start_pos, wal_seg_size); let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial"); let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
if let Ok(opened_file) = File::open(&wal_file_path) { if let Ok(opened_file) = File::open(&wal_file_path) {
file = opened_file; file = opened_file;
} else { } else {
let wal_file_path = self.conf.data_dir.join(wal_file_name); let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name);
match File::open(&wal_file_path) { match File::open(&wal_file_path) {
Ok(opened_file) => file = opened_file, Ok(opened_file) => file = opened_file,
Err(e) => { Err(e) => {
@@ -1017,8 +1084,12 @@ impl Connection {
/* Open file */ /* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size); let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone()); let wal_file_path = self.conf.data_dir
let wal_file_partial_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial"); .join(self.system().id.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
{ {
let mut wal_file: File; let mut wal_file: File;
@@ -1080,7 +1151,7 @@ impl Connection {
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) { fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
find_end_of_wal( find_end_of_wal(
&self.conf.data_dir, &self.conf.data_dir,
self.acceptor.get_info().server.wal_seg_size as usize, self.system().get_info().server.wal_seg_size as usize,
precise, precise,
) )
} }