Format code according to rust style guide

This commit is contained in:
Konstantin Knizhnik
2021-04-07 16:50:58 +03:00
parent 6b9fc3aff0
commit abaa36a15c
4 changed files with 123 additions and 116 deletions

View File

@@ -95,11 +95,11 @@ impl StorageControlPlane {
cplane
}
pub fn stop(&self) {
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
wa.stop();
}
}
wa.stop();
}
}
// // postgres <-> wal_acceptor x3 <-> page_server
// fn local(&mut self) -> StorageControlPlane {
@@ -136,11 +136,10 @@ impl StorageControlPlane {
impl Drop for StorageControlPlane {
fn drop(&mut self) {
self.stop();
}
self.stop();
}
}
pub struct PageServerNode {
page_service_addr: SocketAddr,
data_dir: PathBuf,

View File

@@ -37,7 +37,7 @@ pub enum BeMessage<'a> {
pub struct FeStartupMessage {
pub version: u32,
pub kind: StartupRequestCode,
pub system_id: SystemId
pub system_id: SystemId,
}
#[derive(Debug)]
@@ -79,30 +79,30 @@ impl FeStartupMessage {
_ => StartupRequestCode::Normal,
};
let params_bytes = &buf[8..len];
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
let mut system_id : u64 = 0;
for p in params {
if p == "options" {
options = true;
} else if options {
for opt in p.split(' ') {
if opt.starts_with("system.id=") {
system_id = opt[10..].parse::<u64>().unwrap();
break;
}
}
break;
}
}
let params_bytes = &buf[8..len];
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
let mut system_id: u64 = 0;
for p in params {
if p == "options" {
options = true;
} else if options {
for opt in p.split(' ') {
if opt.starts_with("system.id=") {
system_id = opt[10..].parse::<u64>().unwrap();
break;
}
}
break;
}
}
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
system_id,
system_id,
})))
}
}

View File

@@ -13,6 +13,7 @@ use log::*;
use regex::Regex;
use std::cmp::max;
use std::cmp::min;
use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::fs::OpenOptions;
@@ -21,14 +22,13 @@ use std::io::prelude::*;
use std::io::SeekFrom;
use std::mem;
use std::str;
use std::sync::{Mutex,Arc};
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::sync::Notify;
use tokio::task;
use std::collections::HashMap;
use tokio_postgres::{connect, NoTls, Error};
use tokio_postgres::{connect, Error, NoTls};
use crate::pq_protocol::*;
use crate::xlog_utils::*;
@@ -147,9 +147,9 @@ struct SharedState {
*/
#[derive(Debug)]
pub struct System {
id : SystemId,
mutex : Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
id: SystemId,
mutex: Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
}
/*
@@ -385,13 +385,12 @@ async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
});
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
}
}
impl System {
pub fn new(id : SystemId) -> System {
pub fn new(id: SystemId) -> System {
let shared_state = SharedState {
commit_lsn: 0,
info: SafeKeeperInfo::new(),
@@ -403,7 +402,7 @@ impl System {
},
};
System {
id: id,
id: id,
mutex: Mutex::new(shared_state),
cond: Notify::new(),
}
@@ -446,9 +445,10 @@ impl System {
// Load and lock control file (prevent running more than one instance of safekeeper
fn load_control_file(&self, conf: &WalAcceptorConf) {
let control_file_path = conf.data_dir
.join(self.id.to_string())
.join(CONTROL_FILE_NAME);
let control_file_path = conf
.data_dir
.join(self.id.to_string())
.join(CONTROL_FILE_NAME);
match OpenOptions::new()
.read(true)
.write(true)
@@ -519,10 +519,7 @@ impl System {
}
impl Connection {
pub fn new(
socket: TcpStream,
conf: &WalAcceptorConf,
) -> Connection {
pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection {
Connection {
system: None,
stream: socket,
@@ -533,9 +530,9 @@ impl Connection {
}
}
fn system(&self) -> Arc<System> {
self.system.as_ref().unwrap().clone()
}
fn system(&self) -> Arc<System> {
self.system.as_ref().unwrap().clone()
}
async fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8);
@@ -556,52 +553,53 @@ impl Connection {
Ok(T::unpack(&mut self.inbuf))
}
async fn request_callback(&self) -> std::result::Result<(), Error> {
if let Some(addr) = self.conf.pageserver_addr {
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let callme = format!(
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
self.conf.listen_addr.ip(),
self.conf.listen_addr.port(),
self.system().get_info().server.system_id,
);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
async fn request_callback(&self) -> std::result::Result<(), Error> {
if let Some(addr) = self.conf.pageserver_addr {
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let callme = format!(
"callmemaybe host={} port={} replication=1 options='-c system.id={}'",
self.conf.listen_addr.ip(),
self.conf.listen_addr.port(),
self.system().get_info().server.system_id,
);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("pageserver connection error: {}", e);
}
});
client.simple_query(&callme).await?;
}
Ok(())
}
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("pageserver connection error: {}", e);
}
});
client.simple_query(&callme).await?;
}
Ok(())
}
fn set_system(&mut self, id: SystemId) -> Result<()> {
let mut systems = SYSTEMS.lock().unwrap();
if id == 0 { // non-multitenant configuration: just sigle instance
if let Some(system) = systems.values().next() {
self.system = Some(system.clone());
return Ok(());
}
io_error!("No active instances");
}
if !systems.contains_key(&id) {
let system_dir = self.conf.data_dir.join(id.to_string());
fs::create_dir_all(system_dir)?;
systems.insert(id, Arc::new(System::new(id)));
}
self.system = Some(systems.get(&id).unwrap().clone());
Ok(())
}
fn set_system(&mut self, id: SystemId) -> Result<()> {
let mut systems = SYSTEMS.lock().unwrap();
if id == 0 {
// non-multitenant configuration: just sigle instance
if let Some(system) = systems.values().next() {
self.system = Some(system.clone());
return Ok(());
}
io_error!("No active instances");
}
if !systems.contains_key(&id) {
let system_dir = self.conf.data_dir.join(id.to_string());
fs::create_dir_all(system_dir)?;
systems.insert(id, Arc::new(System::new(id)));
}
self.system = Some(systems.get(&id).unwrap().clone());
Ok(())
}
// Receive WAL from wal_proposer
async fn receive_wal(&mut self) -> Result<()> {
@@ -609,10 +607,11 @@ impl Connection {
let server_info = self.read_req::<ServerInfo>().await?;
info!(
"Start handshake with wal_proposer {} sysid {}",
self.stream.peer_addr()?, server_info.system_id
self.stream.peer_addr()?,
server_info.system_id
);
self.set_system(server_info.system_id)?;
self.system().load_control_file(&self.conf);
self.set_system(server_info.system_id)?;
self.system().load_control_file(&self.conf);
let mut my_info = self.system().get_info();
@@ -675,12 +674,12 @@ impl Connection {
prop.node_id.pack(&mut self.outbuf);
self.send().await?;
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
// Do not treate it as fatal error and continue work
error!("Failed to send callme request to pageserver: {}", e);
}
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
// Do not treate it as fatal error and continue work
error!("Failed to send callme request to pageserver: {}", e);
}
info!(
"Start streaming from server {} address {:?}",
@@ -757,7 +756,8 @@ impl Connection {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
self.system().notify_wal_senders(min(req.commit_lsn, end_pos));
self.system()
.notify_wal_senders(min(req.commit_lsn, end_pos));
}
Ok(())
}
@@ -828,7 +828,7 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;
self.init_done = true;
self.set_system(m.system_id)?;
self.set_system(m.system_id)?;
}
StartupRequestCode::Cancel => return Ok(()),
}
@@ -958,7 +958,7 @@ impl Connection {
/* normal mode */
loop {
// Rust doesn't allow to grab async result from mutex scope
let system = self.system();
let system = self.system();
let notified = system.cond.notified();
{
let shared_state = system.mutex.lock().unwrap();
@@ -997,15 +997,19 @@ impl Connection {
} else {
let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
if let Ok(opened_file) = File::open(&wal_file_path) {
file = opened_file;
} else {
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name);
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(wal_file_name);
match File::open(&wal_file_path) {
Ok(opened_file) => file = opened_file,
Err(e) => {
@@ -1084,12 +1088,16 @@ impl Connection {
/* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self.conf.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;