From 01be823084ef6d8e02fe0b628db3160b5e18cafd Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Sat, 20 May 2023 02:31:07 +0300 Subject: [PATCH] Tune console notifications handler --- proxy/src/bin/proxy.rs | 1 + proxy/src/console/notifications.rs | 57 ++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 6aa2c3412e..5276f280ef 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -74,6 +74,7 @@ async fn main() -> anyhow::Result<()> { if let auth::BackendType::Console(api, _) = &config.auth_backend { if let Some(url) = args.get_one::("redis-notifications") { + info!("Starting redis notifications listener ({url})"); tasks.push(tokio::spawn(console::notifications::task_main( url.to_owned(), api.caches, diff --git a/proxy/src/console/notifications.rs b/proxy/src/console/notifications.rs index cfd855dac9..2df61f4548 100644 --- a/proxy/src/console/notifications.rs +++ b/proxy/src/console/notifications.rs @@ -7,7 +7,29 @@ const CHANNEL_NAME: &str = "proxy_notifications"; #[derive(Debug, Deserialize)] #[serde(tag = "type")] enum Notification<'a> { - UserChangedPassword { project: &'a str, role: &'a str }, + #[serde(rename = "password_changed")] + PasswordChanged { project: &'a str, role: &'a str }, +} + +#[tracing::instrument(skip(caches))] +fn handle_message(msg: redis::Msg, caches: &ApiCaches) -> anyhow::Result<()> { + let payload: String = msg.get_payload()?; + + use Notification::*; + match serde_json::from_str(&payload) { + Ok(PasswordChanged { project, role }) => { + let key = AuthInfoCacheKey { + project: project.into(), + role: role.into(), + }; + + tracing::info!(key = ?key, "invalidating auth info"); + caches.auth_info.remove(&key); + } + Err(e) => tracing::error!("broken message: {e}"), + } + + Ok(()) } /// Handle console's invalidation messages. @@ -16,26 +38,33 @@ pub async fn task_main(url: String, caches: &ApiCaches) -> anyhow::Result<()> { let client = redis::Client::open(url.as_ref())?; let mut conn = client.get_async_connection().await?.into_pubsub(); + tracing::info!("subscribing to a channel `{CHANNEL_NAME}`"); conn.subscribe(CHANNEL_NAME).await?; let mut stream = conn.on_message(); while let Some(msg) = stream.next().await { - let payload: String = msg.get_payload()?; - - use Notification::*; - match serde_json::from_str(&payload) { - Ok(UserChangedPassword { project, role }) => { - caches.auth_info.remove(&AuthInfoCacheKey { - project: project.into(), - role: role.into(), - }); - } - Err(e) => tracing::error!("broken message: {e}"), - } + handle_message(msg, caches)?; } Ok(()) } #[cfg(test)] -mod tests {} +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn parse_notification() -> anyhow::Result<()> { + let text = json!({ + "type": "password_changed", + "project": "very-nice", + "role": "borat", + }) + .to_string(); + + let _: Notification = serde_json::from_str(&text)?; + + Ok(()) + } +}