Tune console notifications handler

This commit is contained in:
Dmitry Ivanov
2023-05-20 02:31:07 +03:00
committed by Vadim Kharitonov
parent 69bb1eee18
commit 01be823084
2 changed files with 44 additions and 14 deletions

View File

@@ -74,6 +74,7 @@ async fn main() -> anyhow::Result<()> {
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let Some(url) = args.get_one::<String>("redis-notifications") {
info!("Starting redis notifications listener ({url})");
tasks.push(tokio::spawn(console::notifications::task_main(
url.to_owned(),
api.caches,

View File

@@ -7,7 +7,29 @@ const CHANNEL_NAME: &str = "proxy_notifications";
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum Notification<'a> {
UserChangedPassword { project: &'a str, role: &'a str },
#[serde(rename = "password_changed")]
PasswordChanged { project: &'a str, role: &'a str },
}
#[tracing::instrument(skip(caches))]
fn handle_message(msg: redis::Msg, caches: &ApiCaches) -> anyhow::Result<()> {
let payload: String = msg.get_payload()?;
use Notification::*;
match serde_json::from_str(&payload) {
Ok(PasswordChanged { project, role }) => {
let key = AuthInfoCacheKey {
project: project.into(),
role: role.into(),
};
tracing::info!(key = ?key, "invalidating auth info");
caches.auth_info.remove(&key);
}
Err(e) => tracing::error!("broken message: {e}"),
}
Ok(())
}
/// Handle console's invalidation messages.
@@ -16,26 +38,33 @@ pub async fn task_main(url: String, caches: &ApiCaches) -> anyhow::Result<()> {
let client = redis::Client::open(url.as_ref())?;
let mut conn = client.get_async_connection().await?.into_pubsub();
tracing::info!("subscribing to a channel `{CHANNEL_NAME}`");
conn.subscribe(CHANNEL_NAME).await?;
let mut stream = conn.on_message();
while let Some(msg) = stream.next().await {
let payload: String = msg.get_payload()?;
use Notification::*;
match serde_json::from_str(&payload) {
Ok(UserChangedPassword { project, role }) => {
caches.auth_info.remove(&AuthInfoCacheKey {
project: project.into(),
role: role.into(),
});
}
Err(e) => tracing::error!("broken message: {e}"),
}
handle_message(msg, caches)?;
}
Ok(())
}
#[cfg(test)]
mod tests {}
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_notification() -> anyhow::Result<()> {
let text = json!({
"type": "password_changed",
"project": "very-nice",
"role": "borat",
})
.to_string();
let _: Notification = serde_json::from_str(&text)?;
Ok(())
}
}