Multitenant wal_acceptor

This commit is contained in:
Konstantin Knizhnik
2021-04-07 13:43:40 +03:00
parent 8547184830
commit 3fea78d688
4 changed files with 169 additions and 62 deletions

View File

@@ -34,7 +34,14 @@ fn main() -> Result<(), io::Error> {
.short("l")
.long("listen")
.takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"),
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
.short("p")
.long("pageserver")
.takes_value(true)
.help("address ip:port of pageserver with which wal_acceptor should establish connection"),
)
.arg(
Arg::with_name("daemonize")
@@ -56,6 +63,7 @@ fn main() -> Result<(), io::Error> {
data_dir: PathBuf::from("./"),
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_addr: "127.0.0.1:5454".parse().unwrap(),
};
@@ -75,6 +83,10 @@ fn main() -> Result<(), io::Error> {
conf.listen_addr = addr.parse().unwrap();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
conf.pageserver_addr = Some(addr.parse().unwrap());
}
start_wal_acceptor(conf)
}

View File

@@ -13,4 +13,5 @@ pub struct WalAcceptorConf {
pub daemonize: bool,
pub no_sync: bool,
pub listen_addr: SocketAddr,
pub pageserver_addr: Option<SocketAddr>,
}

View File

@@ -1,8 +1,10 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io;
use std::str;
pub type Oid = u32;
pub type SystemId = u64;
pub type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)]
@@ -35,6 +37,7 @@ pub enum BeMessage<'a> {
pub struct FeStartupMessage {
pub version: u32,
pub kind: StartupRequestCode,
pub system_id: SystemId
}
#[derive(Debug)]
@@ -76,10 +79,30 @@ impl FeStartupMessage {
_ => StartupRequestCode::Normal,
};
let params_bytes = &buf[8..len];
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
let mut system_id : u64 = 0;
for p in params {
if p == "options" {
options = true;
} else if options {
for opt in p.split(' ') {
if opt.starts_with("system.id=") {
system_id = opt[10..].parse::<u64>().unwrap();
break;
}
}
break;
}
}
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
system_id,
})))
}
}

View File

@@ -21,12 +21,14 @@ use std::io::prelude::*;
use std::io::SeekFrom;
use std::mem;
use std::str;
use std::sync::Mutex;
use std::sync::{Mutex,Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::sync::Notify;
use tokio::task;
use std::collections::HashMap;
use tokio_postgres::{connect, NoTls, Error};
use crate::pq_protocol::*;
use crate::xlog_utils::*;
@@ -62,7 +64,7 @@ struct ServerInfo {
protocol_version: u32, /* proxy-safekeeper protocol version */
pg_version: u32, /* Postgres server version */
node_id: NodeId,
system_id: u64, /* Postgres system identifier */
system_id: SystemId, /* Postgres system identifier */
wal_end: XLogRecPtr,
timeline: TimeLineID,
wal_seg_size: u32,
@@ -130,7 +132,7 @@ struct SafeKeeperResponse {
}
/*
* State shared by safekeeper tasks and protected by mutex
* Shared state associated with database instance (tenant)
*/
#[derive(Debug)]
struct SharedState {
@@ -141,12 +143,13 @@ struct SharedState {
}
/*
* Static data
* Database instance (tenant)
*/
#[derive(Debug)]
pub struct WalAcceptor {
mutex: Mutex<SharedState>, /* mutext for protecting shared state */
cond: Notify, /* conditional variable used to notify wal senders */
pub struct System {
id : SystemId,
mutex : Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
}
/*
@@ -154,7 +157,7 @@ pub struct WalAcceptor {
*/
#[derive(Debug)]
struct Connection {
acceptor: &'static WalAcceptor,
system: Option<Arc<System>>,
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
@@ -346,7 +349,7 @@ impl Serializer for SafeKeeperResponse {
}
lazy_static! {
pub static ref SELF: WalAcceptor = WalAcceptor::new();
pub static ref SYSTEMS: Mutex<HashMap<SystemId, Arc<System>>> = Mutex::new(HashMap::new());
}
pub fn thread_main(conf: WalAcceptorConf) {
@@ -362,15 +365,33 @@ pub fn thread_main(conf: WalAcceptorConf) {
info!("Starting wal acceptor on {}", conf.listen_addr);
SELF.load_control_file(&conf);
runtime.block_on(async {
let _unused = SELF.main_loop(&conf).await;
let _unused = main_loop(&conf).await;
});
}
impl WalAcceptor {
pub fn new() -> WalAcceptor {
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
error!("error: {}", err);
}
});
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
}
impl System {
pub fn new(id : SystemId) -> System {
let shared_state = SharedState {
commit_lsn: 0,
info: SafeKeeperInfo::new(),
@@ -381,7 +402,8 @@ impl WalAcceptor {
catalog_xmin: u64::MAX,
},
};
WalAcceptor {
System {
id: id,
mutex: Mutex::new(shared_state),
cond: Notify::new(),
}
@@ -424,7 +446,9 @@ impl WalAcceptor {
// Load and lock control file (prevent running more than one instane of safekeeper */
fn load_control_file(&self, conf: &WalAcceptorConf) {
let control_file_path = conf.data_dir.join(CONTROL_FILE_NAME);
let control_file_path = conf.data_dir
.join(self.id.to_string())
.join(CONTROL_FILE_NAME);
match OpenOptions::new()
.read(true)
.write(true)
@@ -492,35 +516,15 @@ impl WalAcceptor {
}
Ok(())
}
async fn main_loop(&'static self, conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(self, socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
error!("error: {}", err);
}
});
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
}
}
impl Connection {
pub fn new(
acceptor: &'static WalAcceptor,
socket: TcpStream,
conf: &WalAcceptorConf,
) -> Connection {
Connection {
acceptor: acceptor,
system: None,
stream: socket,
inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024),
@@ -529,6 +533,10 @@ impl Connection {
}
}
fn system(&self) -> Arc<System> {
self.system.as_ref().unwrap().clone()
}
async fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8);
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
@@ -548,15 +556,65 @@ impl Connection {
Ok(T::unpack(&mut self.inbuf))
}
async fn request_callback(&self) -> std::result::Result<(), Error> {
if let Some(addr) = self.conf.pageserver_addr {
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let callme = format!(
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
self.conf.listen_addr.ip(),
self.conf.listen_addr.port(),
self.system().get_info().server.system_id,
);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("pageserver connection error: {}", e);
}
});
client.simple_query(&callme).await?;
}
Ok(())
}
fn set_system(&mut self, id: SystemId) -> Result<()> {
let mut systems = SYSTEMS.lock().unwrap();
if id == 0 { // non-multitenant configuration: just sigle instance
if let Some(system) = systems.values().next() {
self.system = Some(system.clone());
return Ok(());
}
io_error!("No active instances");
}
if !systems.contains_key(&id) {
let system_dir = self.conf.data_dir.join(id.to_string());
fs::create_dir_all(system_dir)?;
systems.insert(id, Arc::new(System::new(id)));
}
self.system = Some(systems.get(&id).unwrap().clone());
Ok(())
}
// Receive WAL from wal_proposer
async fn receive_wal(&mut self) -> Result<()> {
let mut my_info = self.acceptor.get_info();
// Receive information about server
let server_info = self.read_req::<ServerInfo>().await?;
info!(
"Start handshake with wal_proposer {}",
self.stream.peer_addr()?
"Start handshake with wal_proposer {} sysid {}",
self.stream.peer_addr()?, server_info.system_id
);
self.set_system(server_info.system_id)?;
self.system().load_control_file(&self.conf);
let mut my_info = self.system().get_info();
/* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION {
@@ -605,9 +663,9 @@ impl Connection {
);
}
my_info.server.node_id = prop.node_id;
self.acceptor.set_info(&my_info);
self.system().set_info(&my_info);
/* Need to persist our vote first */
self.acceptor.save_control_file(true)?;
self.system().save_control_file(true)?;
let mut flushed_restart_lsn: XLogRecPtr = 0;
let wal_seg_size = server_info.wal_seg_size as usize;
@@ -617,6 +675,13 @@ impl Connection {
prop.node_id.pack(&mut self.outbuf);
self.send().await?;
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
// Do not treate it as fatal error and continue work
error!("Failed to send callme request to pageserver: {}", e);
}
info!(
"Start streaming from server {} address {:?}",
server_info.system_id,
@@ -671,7 +736,7 @@ impl Connection {
* when restart_lsn delta exceeds WAL segment size.
*/
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
self.acceptor.save_control_file(sync_control_file)?;
self.system().save_control_file(sync_control_file)?;
if sync_control_file {
flushed_restart_lsn = my_info.restart_lsn;
@@ -682,7 +747,7 @@ impl Connection {
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: end_pos,
hs_feedback: self.acceptor.get_hs_feedback(),
hs_feedback: self.system().get_hs_feedback(),
};
self.start_sending();
resp.pack(&mut self.outbuf);
@@ -692,8 +757,7 @@ impl Connection {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
self.acceptor
.notify_wal_senders(min(req.commit_lsn, end_pos));
self.system().notify_wal_senders(min(req.commit_lsn, end_pos));
}
Ok(())
}
@@ -711,10 +775,7 @@ impl Connection {
if self.inbuf.is_empty() {
return Ok(None);
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"connection reset by peer",
));
io_error!("connection reset by peer");
}
}
}
@@ -767,6 +828,7 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;
self.init_done = true;
self.set_system(m.system_id)?;
}
StartupRequestCode::Cancel => return Ok(()),
}
@@ -784,7 +846,7 @@ impl Connection {
break;
}
_ => {
return Err(io::Error::new(io::ErrorKind::Other, "unexpected message"));
io_error!("unexpected message");
}
}
}
@@ -799,7 +861,7 @@ impl Connection {
let (start_pos, timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
let tli = timeline.to_string();
let sysid = self.acceptor.get_info().server.system_id.to_string();
let sysid = self.system().get_info().server.system_id.to_string();
let lsn_bytes = lsn.as_bytes();
let tli_bytes = tli.as_bytes();
let sysid_bytes = sysid.as_bytes();
@@ -855,7 +917,7 @@ impl Connection {
} else {
0
};
let wal_seg_size = self.acceptor.get_info().server.wal_seg_size as usize;
let wal_seg_size = self.system().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
io_error!("Can not start replication before connecting to wal_proposer");
}
@@ -896,9 +958,10 @@ impl Connection {
/* normal mode */
loop {
// Rust doesn't allow to grab async result from mutex scope
let notified = self.acceptor.cond.notified();
let system = self.system();
let notified = system.cond.notified();
{
let shared_state = self.acceptor.mutex.lock().unwrap();
let shared_state = system.mutex.lock().unwrap();
commit_lsn = shared_state.commit_lsn;
if start_pos < commit_lsn {
end_pos = commit_lsn;
@@ -916,7 +979,7 @@ impl Connection {
Ok(0) => break,
Ok(_) => match self.parse_message()? {
Some(FeMessage::CopyData(m)) => self
.acceptor
.system()
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
_ => {}
},
@@ -934,11 +997,15 @@ impl Connection {
} else {
let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
if let Ok(opened_file) = File::open(&wal_file_path) {
file = opened_file;
} else {
let wal_file_path = self.conf.data_dir.join(wal_file_name);
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name);
match File::open(&wal_file_path) {
Ok(opened_file) => file = opened_file,
Err(e) => {
@@ -1017,8 +1084,12 @@ impl Connection {
/* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone());
let wal_file_partial_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
@@ -1080,7 +1151,7 @@ impl Connection {
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
find_end_of_wal(
&self.conf.data_dir,
self.acceptor.get_info().server.wal_seg_size as usize,
self.system().get_info().server.wal_seg_size as usize,
precise,
)
}