diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 671305a300..4383d6be2c 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -30,8 +30,14 @@ async fn try_connect(client: &ConnectionWithCredentialsProvider) -> anyhow::Resu Ok(conn) } +#[derive(Debug, Deserialize)] +struct NotificationHeader<'a> { + topic: &'a str, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(tag = "topic", content = "data")] +// Message to contributors: Make sure to align these topic names with the list below. pub(crate) enum Notification { #[serde( rename = "/allowed_ips_updated", @@ -69,6 +75,22 @@ pub(crate) enum Notification { #[serde(rename = "/cancel_session")] Cancel(CancelSession), } + +/// Returns true if the topic name given is a known topic that we can deserialize and action on. +/// Returns false otherwise. +fn known_topic(s: &str) -> bool { + // Message to contributors: Make sure to align these topic names with the enum above. + matches!( + s, + "/allowed_ips_updated" + | "/block_public_or_vpc_access_updated" + | "/allowed_vpc_endpoints_updated_for_org" + | "/allowed_vpc_endpoints_updated_for_projects" + | "/password_updated" + | "/cancel_session" + ) +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub(crate) struct AllowedIpsUpdate { project_id: ProjectIdInt, @@ -96,6 +118,7 @@ pub(crate) struct PasswordUpdate { project_id: ProjectIdInt, role_name: RoleNameInt, } + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub(crate) struct CancelSession { pub(crate) region_id: Option, @@ -141,18 +164,23 @@ impl MessageHandler { region_id, } } + pub(crate) async fn increment_active_listeners(&self) { self.cache.increment_active_listeners().await; } + pub(crate) async fn decrement_active_listeners(&self) { self.cache.decrement_active_listeners().await; } + #[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))] async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> { let payload: String = msg.get_payload()?; tracing::debug!(?payload, "received a message payload"); - let msg: Notification = match serde_json::from_str(&payload) { + // For better error handling, we first parse the payload to extract the topic. + // If there's a topic we don't support, we can handle that error more gracefully. + let header: NotificationHeader = match serde_json::from_str(&payload) { Ok(msg) => msg, Err(e) => { Metrics::get().proxy.redis_errors_total.inc(RedisErrors { @@ -162,6 +190,24 @@ impl MessageHandler { return Ok(()); } }; + + if !known_topic(header.topic) { + // don't update the metric for redis errors if it's just a topic we don't know about. + tracing::warn!(topic = header.topic, "unknown topic"); + return Ok(()); + } + + let msg: Notification = match serde_json::from_str(&payload) { + Ok(msg) => msg, + Err(e) => { + Metrics::get().proxy.redis_errors_total.inc(RedisErrors { + channel: msg.get_channel_name(), + }); + tracing::error!(topic = header.topic, "broken message: {e}"); + return Ok(()); + } + }; + tracing::debug!(?msg, "received a message"); match msg { Notification::Cancel(cancel_session) => {