review fixes

This commit is contained in:
Stas Kelvich
2021-07-15 11:29:33 +03:00
parent 8ec234ba78
commit a118557331
4 changed files with 17 additions and 22 deletions

View File

@@ -70,7 +70,7 @@ impl CPlaneApi {
}
}
pub fn get_database_uri(&self, _user: &String, _database: &String) -> Result<DatabaseInfo> {
pub fn get_database_uri(&self, _user: &str, _database: &str) -> Result<DatabaseInfo> {
Ok(DatabaseInfo {
host: "127.0.0.1".parse()?,
port: 5432,

View File

@@ -87,24 +87,19 @@ fn main() -> anyhow::Result<()> {
println!("Starting mgmt on {}", state.conf.mgmt_address);
let mgmt_listener = TcpListener::bind(state.conf.mgmt_address)?;
let mut threads = Vec::new();
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
threads.push(
let threads = vec![
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
thread::Builder::new()
.name("Proxy thread".into())
.spawn(move || proxy::thread_main(&state, pageserver_listener))?,
);
threads.push(
thread::Builder::new()
.name("Mgmt thread".into())
.spawn(move || mgmt::thread_main(&state, mgmt_listener))?,
);
];
for t in threads {
let _ = t.join().unwrap();
for t in threads.into_iter() {
t.join().unwrap()?;
}
Ok(())

View File

@@ -39,7 +39,6 @@ pub fn thread_main(
struct ProxyConnection {
state: &'static ProxyState,
existing_user: bool,
cplane: CPlaneApi,
user: String,
@@ -57,7 +56,6 @@ pub fn proxy_conn_main(
) -> anyhow::Result<()> {
let mut conn = ProxyConnection {
state,
existing_user: false,
cplane: CPlaneApi::new(&state.conf.cplane_address),
user: "".into(),
database: "".into(),
@@ -71,7 +69,7 @@ pub fn proxy_conn_main(
conn.handle_startup()?;
// both scenarious here should end up producing database connection string
let db_info = if conn.existing_user {
let db_info = if conn.is_existing_user() {
conn.handle_existing_user()?
} else {
conn.handle_new_user()?
@@ -91,6 +89,10 @@ pub fn proxy_conn_main(
}
impl ProxyConnection {
fn is_existing_user(&self) -> bool {
self.user.ends_with("@zenith")
}
fn handle_startup(&mut self) -> anyhow::Result<()> {
loop {
let msg = self.pgb.read_message()?;
@@ -120,8 +122,6 @@ impl ProxyConnection {
})?
.into();
self.existing_user = self.user.ends_with("@zenith");
break;
}
StartupRequestCode::Cancel => break,
@@ -152,7 +152,7 @@ impl ProxyConnection {
if let Some(FeMessage::PasswordMessage(m)) = msg {
println!("got password message '{:?}'", m);
assert!(self.existing_user);
assert!(self.is_existing_user());
let (_trailing_null, md5_response) = m
.split_last()
@@ -196,7 +196,7 @@ databases without opening the browser.
self.pgb
.write_message_noflush(&BeMessage::ParameterStatus)?;
self.pgb
.write_message(&BeMessage::NoticeResponse(hello_message.to_string()))?;
.write_message(&BeMessage::NoticeResponse(hello_message))?;
// await for database creation
let (tx, rx) = channel::<anyhow::Result<DatabaseInfo>>();
@@ -220,7 +220,7 @@ databases without opening the browser.
}
fn check_auth_md5(&self, md5_response: &[u8]) -> anyhow::Result<()> {
assert!(self.existing_user);
assert!(self.is_existing_user());
self.cplane
.check_auth(self.user.as_str(), md5_response, &self.md5_salt)
}

View File

@@ -64,7 +64,7 @@ pub struct PostgresBackend {
}
// TODO: call shutdown() manually.
// into_shtm() methods do not work with types implementing Drop
// into_smth() methods do not work with types implementing Drop
// // In replication.rs a separate thread is reading keepalives from the
// // socket. When main one finishes, tell it to get down by shutdowning the
@@ -190,7 +190,7 @@ impl PostgresBackend {
}
AuthType::MD5 => {
rand::thread_rng().fill(&mut self.md5_salt);
let md5_salt = self.md5_salt.clone();
let md5_salt = self.md5_salt;
self.write_message(&BeMessage::AuthenticationMD5Password(
&md5_salt,
))?;