mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 15:49:58 +00:00
Multitenant wal_acceptor
This commit is contained in:
@@ -34,7 +34,14 @@ fn main() -> Result<(), io::Error> {
|
||||
.short("l")
|
||||
.long("listen")
|
||||
.takes_value(true)
|
||||
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"),
|
||||
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5454)"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("pageserver")
|
||||
.short("p")
|
||||
.long("pageserver")
|
||||
.takes_value(true)
|
||||
.help("address ip:port of pageserver with which wal_acceptor should establish connection"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("daemonize")
|
||||
@@ -56,6 +63,7 @@ fn main() -> Result<(), io::Error> {
|
||||
data_dir: PathBuf::from("./"),
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
pageserver_addr: None,
|
||||
listen_addr: "127.0.0.1:5454".parse().unwrap(),
|
||||
};
|
||||
|
||||
@@ -75,6 +83,10 @@ fn main() -> Result<(), io::Error> {
|
||||
conf.listen_addr = addr.parse().unwrap();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("pageserver") {
|
||||
conf.pageserver_addr = Some(addr.parse().unwrap());
|
||||
}
|
||||
|
||||
start_wal_acceptor(conf)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,4 +13,5 @@ pub struct WalAcceptorConf {
|
||||
pub daemonize: bool,
|
||||
pub no_sync: bool,
|
||||
pub listen_addr: SocketAddr,
|
||||
pub pageserver_addr: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use std::io;
|
||||
use std::str;
|
||||
|
||||
pub type Oid = u32;
|
||||
pub type SystemId = u64;
|
||||
pub type Result<T> = std::result::Result<T, io::Error>;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -35,6 +37,7 @@ pub enum BeMessage<'a> {
|
||||
pub struct FeStartupMessage {
|
||||
pub version: u32,
|
||||
pub kind: StartupRequestCode,
|
||||
pub system_id: SystemId
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -76,10 +79,30 @@ impl FeStartupMessage {
|
||||
_ => StartupRequestCode::Normal,
|
||||
};
|
||||
|
||||
let params_bytes = &buf[8..len];
|
||||
let params_str = str::from_utf8(¶ms_bytes).unwrap();
|
||||
let params = params_str.split('\0');
|
||||
let mut options = false;
|
||||
let mut system_id : u64 = 0;
|
||||
for p in params {
|
||||
if p == "options" {
|
||||
options = true;
|
||||
} else if options {
|
||||
for opt in p.split(' ') {
|
||||
if opt.starts_with("system.id=") {
|
||||
system_id = opt[10..].parse::<u64>().unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
buf.advance(len as usize);
|
||||
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
|
||||
version,
|
||||
kind,
|
||||
system_id,
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,12 +21,14 @@ use std::io::prelude::*;
|
||||
use std::io::SeekFrom;
|
||||
use std::mem;
|
||||
use std::str;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::{Mutex,Arc};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::runtime;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task;
|
||||
use std::collections::HashMap;
|
||||
use tokio_postgres::{connect, NoTls, Error};
|
||||
|
||||
use crate::pq_protocol::*;
|
||||
use crate::xlog_utils::*;
|
||||
@@ -62,7 +64,7 @@ struct ServerInfo {
|
||||
protocol_version: u32, /* proxy-safekeeper protocol version */
|
||||
pg_version: u32, /* Postgres server version */
|
||||
node_id: NodeId,
|
||||
system_id: u64, /* Postgres system identifier */
|
||||
system_id: SystemId, /* Postgres system identifier */
|
||||
wal_end: XLogRecPtr,
|
||||
timeline: TimeLineID,
|
||||
wal_seg_size: u32,
|
||||
@@ -130,7 +132,7 @@ struct SafeKeeperResponse {
|
||||
}
|
||||
|
||||
/*
|
||||
* State shared by safekeeper tasks and protected by mutex
|
||||
* Shared state associated with database instance (tenant)
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
struct SharedState {
|
||||
@@ -141,12 +143,13 @@ struct SharedState {
|
||||
}
|
||||
|
||||
/*
|
||||
* Static data
|
||||
* Database instance (tenant)
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub struct WalAcceptor {
|
||||
mutex: Mutex<SharedState>, /* mutext for protecting shared state */
|
||||
cond: Notify, /* conditional variable used to notify wal senders */
|
||||
pub struct System {
|
||||
id : SystemId,
|
||||
mutex : Mutex<SharedState>,
|
||||
cond: Notify, /* conditional variable used to notify wal senders */
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -154,7 +157,7 @@ pub struct WalAcceptor {
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
struct Connection {
|
||||
acceptor: &'static WalAcceptor,
|
||||
system: Option<Arc<System>>,
|
||||
stream: TcpStream, /* Postgres connection */
|
||||
inbuf: BytesMut, /* input buffer */
|
||||
outbuf: BytesMut, /* output buffer */
|
||||
@@ -346,7 +349,7 @@ impl Serializer for SafeKeeperResponse {
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SELF: WalAcceptor = WalAcceptor::new();
|
||||
pub static ref SYSTEMS: Mutex<HashMap<SystemId, Arc<System>>> = Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub fn thread_main(conf: WalAcceptorConf) {
|
||||
@@ -362,15 +365,33 @@ pub fn thread_main(conf: WalAcceptorConf) {
|
||||
|
||||
info!("Starting wal acceptor on {}", conf.listen_addr);
|
||||
|
||||
SELF.load_control_file(&conf);
|
||||
|
||||
runtime.block_on(async {
|
||||
let _unused = SELF.main_loop(&conf).await;
|
||||
let _unused = main_loop(&conf).await;
|
||||
});
|
||||
}
|
||||
|
||||
impl WalAcceptor {
|
||||
pub fn new() -> WalAcceptor {
|
||||
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
|
||||
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, peer_addr)) => {
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
socket.set_nodelay(true)?;
|
||||
let mut conn = Connection::new(socket, &conf);
|
||||
task::spawn(async move {
|
||||
if let Err(err) = conn.run().await {
|
||||
error!("error: {}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => error!("Failed to accept connection: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl System {
|
||||
pub fn new(id : SystemId) -> System {
|
||||
let shared_state = SharedState {
|
||||
commit_lsn: 0,
|
||||
info: SafeKeeperInfo::new(),
|
||||
@@ -381,7 +402,8 @@ impl WalAcceptor {
|
||||
catalog_xmin: u64::MAX,
|
||||
},
|
||||
};
|
||||
WalAcceptor {
|
||||
System {
|
||||
id: id,
|
||||
mutex: Mutex::new(shared_state),
|
||||
cond: Notify::new(),
|
||||
}
|
||||
@@ -424,7 +446,9 @@ impl WalAcceptor {
|
||||
|
||||
// Load and lock control file (prevent running more than one instane of safekeeper */
|
||||
fn load_control_file(&self, conf: &WalAcceptorConf) {
|
||||
let control_file_path = conf.data_dir.join(CONTROL_FILE_NAME);
|
||||
let control_file_path = conf.data_dir
|
||||
.join(self.id.to_string())
|
||||
.join(CONTROL_FILE_NAME);
|
||||
match OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
@@ -492,35 +516,15 @@ impl WalAcceptor {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn main_loop(&'static self, conf: &WalAcceptorConf) -> Result<()> {
|
||||
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, peer_addr)) => {
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
socket.set_nodelay(true)?;
|
||||
let mut conn = Connection::new(self, socket, &conf);
|
||||
task::spawn(async move {
|
||||
if let Err(err) = conn.run().await {
|
||||
error!("error: {}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => error!("Failed to accept connection: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(
|
||||
acceptor: &'static WalAcceptor,
|
||||
socket: TcpStream,
|
||||
conf: &WalAcceptorConf,
|
||||
) -> Connection {
|
||||
Connection {
|
||||
acceptor: acceptor,
|
||||
system: None,
|
||||
stream: socket,
|
||||
inbuf: BytesMut::with_capacity(10 * 1024),
|
||||
outbuf: BytesMut::with_capacity(10 * 1024),
|
||||
@@ -529,6 +533,10 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
fn system(&self) -> Arc<System> {
|
||||
self.system.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
async fn run(&mut self) -> Result<()> {
|
||||
self.inbuf.resize(4, 0u8);
|
||||
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
|
||||
@@ -548,15 +556,65 @@ impl Connection {
|
||||
Ok(T::unpack(&mut self.inbuf))
|
||||
}
|
||||
|
||||
async fn request_callback(&self) -> std::result::Result<(), Error> {
|
||||
if let Some(addr) = self.conf.pageserver_addr {
|
||||
let ps_connstr = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
addr.ip(),
|
||||
addr.port(),
|
||||
"no_db",
|
||||
"no_user",
|
||||
);
|
||||
let callme = format!(
|
||||
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
|
||||
self.conf.listen_addr.ip(),
|
||||
self.conf.listen_addr.port(),
|
||||
self.system().get_info().server.system_id,
|
||||
);
|
||||
let (client, connection) = connect(&ps_connstr, NoTls).await?;
|
||||
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("pageserver connection error: {}", e);
|
||||
}
|
||||
});
|
||||
client.simple_query(&callme).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_system(&mut self, id: SystemId) -> Result<()> {
|
||||
let mut systems = SYSTEMS.lock().unwrap();
|
||||
if id == 0 { // non-multitenant configuration: just sigle instance
|
||||
if let Some(system) = systems.values().next() {
|
||||
self.system = Some(system.clone());
|
||||
return Ok(());
|
||||
}
|
||||
io_error!("No active instances");
|
||||
}
|
||||
if !systems.contains_key(&id) {
|
||||
let system_dir = self.conf.data_dir.join(id.to_string());
|
||||
fs::create_dir_all(system_dir)?;
|
||||
systems.insert(id, Arc::new(System::new(id)));
|
||||
}
|
||||
self.system = Some(systems.get(&id).unwrap().clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive WAL from wal_proposer
|
||||
async fn receive_wal(&mut self) -> Result<()> {
|
||||
let mut my_info = self.acceptor.get_info();
|
||||
// Receive information about server
|
||||
let server_info = self.read_req::<ServerInfo>().await?;
|
||||
info!(
|
||||
"Start handshake with wal_proposer {}",
|
||||
self.stream.peer_addr()?
|
||||
"Start handshake with wal_proposer {} sysid {}",
|
||||
self.stream.peer_addr()?, server_info.system_id
|
||||
);
|
||||
self.set_system(server_info.system_id)?;
|
||||
self.system().load_control_file(&self.conf);
|
||||
|
||||
let mut my_info = self.system().get_info();
|
||||
|
||||
/* Check protocol compatibility */
|
||||
if server_info.protocol_version != SK_PROTOCOL_VERSION {
|
||||
@@ -605,9 +663,9 @@ impl Connection {
|
||||
);
|
||||
}
|
||||
my_info.server.node_id = prop.node_id;
|
||||
self.acceptor.set_info(&my_info);
|
||||
self.system().set_info(&my_info);
|
||||
/* Need to persist our vote first */
|
||||
self.acceptor.save_control_file(true)?;
|
||||
self.system().save_control_file(true)?;
|
||||
|
||||
let mut flushed_restart_lsn: XLogRecPtr = 0;
|
||||
let wal_seg_size = server_info.wal_seg_size as usize;
|
||||
@@ -617,6 +675,13 @@ impl Connection {
|
||||
prop.node_id.pack(&mut self.outbuf);
|
||||
self.send().await?;
|
||||
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
if let Err(e) = self.request_callback().await {
|
||||
// Do not treate it as fatal error and continue work
|
||||
error!("Failed to send callme request to pageserver: {}", e);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Start streaming from server {} address {:?}",
|
||||
server_info.system_id,
|
||||
@@ -671,7 +736,7 @@ impl Connection {
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
|
||||
self.acceptor.save_control_file(sync_control_file)?;
|
||||
self.system().save_control_file(sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
flushed_restart_lsn = my_info.restart_lsn;
|
||||
@@ -682,7 +747,7 @@ impl Connection {
|
||||
let resp = SafeKeeperResponse {
|
||||
epoch: my_info.epoch,
|
||||
flush_lsn: end_pos,
|
||||
hs_feedback: self.acceptor.get_hs_feedback(),
|
||||
hs_feedback: self.system().get_hs_feedback(),
|
||||
};
|
||||
self.start_sending();
|
||||
resp.pack(&mut self.outbuf);
|
||||
@@ -692,8 +757,7 @@ impl Connection {
|
||||
* Ping wal sender that new data is available.
|
||||
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
|
||||
*/
|
||||
self.acceptor
|
||||
.notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
self.system().notify_wal_senders(min(req.commit_lsn, end_pos));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -711,10 +775,7 @@ impl Connection {
|
||||
if self.inbuf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"connection reset by peer",
|
||||
));
|
||||
io_error!("connection reset by peer");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -767,6 +828,7 @@ impl Connection {
|
||||
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
|
||||
self.send().await?;
|
||||
self.init_done = true;
|
||||
self.set_system(m.system_id)?;
|
||||
}
|
||||
StartupRequestCode::Cancel => return Ok(()),
|
||||
}
|
||||
@@ -784,7 +846,7 @@ impl Connection {
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "unexpected message"));
|
||||
io_error!("unexpected message");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -799,7 +861,7 @@ impl Connection {
|
||||
let (start_pos, timeline) = self.find_end_of_wal(false);
|
||||
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
|
||||
let tli = timeline.to_string();
|
||||
let sysid = self.acceptor.get_info().server.system_id.to_string();
|
||||
let sysid = self.system().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
@@ -855,7 +917,7 @@ impl Connection {
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let wal_seg_size = self.acceptor.get_info().server.wal_seg_size as usize;
|
||||
let wal_seg_size = self.system().get_info().server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
io_error!("Can not start replication before connecting to wal_proposer");
|
||||
}
|
||||
@@ -896,9 +958,10 @@ impl Connection {
|
||||
/* normal mode */
|
||||
loop {
|
||||
// Rust doesn't allow to grab async result from mutex scope
|
||||
let notified = self.acceptor.cond.notified();
|
||||
let system = self.system();
|
||||
let notified = system.cond.notified();
|
||||
{
|
||||
let shared_state = self.acceptor.mutex.lock().unwrap();
|
||||
let shared_state = system.mutex.lock().unwrap();
|
||||
commit_lsn = shared_state.commit_lsn;
|
||||
if start_pos < commit_lsn {
|
||||
end_pos = commit_lsn;
|
||||
@@ -916,7 +979,7 @@ impl Connection {
|
||||
Ok(0) => break,
|
||||
Ok(_) => match self.parse_message()? {
|
||||
Some(FeMessage::CopyData(m)) => self
|
||||
.acceptor
|
||||
.system()
|
||||
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
|
||||
_ => {}
|
||||
},
|
||||
@@ -934,11 +997,15 @@ impl Connection {
|
||||
} else {
|
||||
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
|
||||
let wal_file_path = self.conf.data_dir
|
||||
.join(self.system().id.to_string())
|
||||
.join(wal_file_name.clone() + ".partial");
|
||||
if let Ok(opened_file) = File::open(&wal_file_path) {
|
||||
file = opened_file;
|
||||
} else {
|
||||
let wal_file_path = self.conf.data_dir.join(wal_file_name);
|
||||
let wal_file_path = self.conf.data_dir
|
||||
.join(self.system().id.to_string())
|
||||
.join(wal_file_name);
|
||||
match File::open(&wal_file_path) {
|
||||
Ok(opened_file) => file = opened_file,
|
||||
Err(e) => {
|
||||
@@ -1017,8 +1084,12 @@ impl Connection {
|
||||
/* Open file */
|
||||
let segno = XLByteToSeg(start_pos, wal_seg_size);
|
||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||
let wal_file_path = self.conf.data_dir.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = self.conf.data_dir.join(wal_file_name.clone() + ".partial");
|
||||
let wal_file_path = self.conf.data_dir
|
||||
.join(self.system().id.to_string())
|
||||
.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = self.conf.data_dir
|
||||
.join(self.system().id.to_string())
|
||||
.join(wal_file_name.clone() + ".partial");
|
||||
|
||||
{
|
||||
let mut wal_file: File;
|
||||
@@ -1080,7 +1151,7 @@ impl Connection {
|
||||
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
|
||||
find_end_of_wal(
|
||||
&self.conf.data_dir,
|
||||
self.acceptor.get_info().server.wal_seg_size as usize,
|
||||
self.system().get_info().server.wal_seg_size as usize,
|
||||
precise,
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user