mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
feat(proxy): dont trigger error alerts for unknown topics (#10266)
## Problem Before the holidays, and just before our code freeze, a change to cplane was made that started publishing the topics from #10197. This triggered our alerts and put us in a sticky situation as it was not an error, and we didn't want to silence the alert for the entire holidays, and we didn't want to release proxy 2 days in a row if it was not essential. We fixed it eventually by rewriting the alert based on logs, but this is not a good solution. ## Summary of changes Introduces an intermediate parsing step to check the topic name first, to allow us to ignore parsing errors for any topics we do not know about.
This commit is contained in:
@@ -30,8 +30,14 @@ async fn try_connect(client: &ConnectionWithCredentialsProvider) -> anyhow::Resu
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct NotificationHeader<'a> {
|
||||
topic: &'a str,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[serde(tag = "topic", content = "data")]
|
||||
// Message to contributors: Make sure to align these topic names with the list below.
|
||||
pub(crate) enum Notification {
|
||||
#[serde(
|
||||
rename = "/allowed_ips_updated",
|
||||
@@ -69,6 +75,22 @@ pub(crate) enum Notification {
|
||||
#[serde(rename = "/cancel_session")]
|
||||
Cancel(CancelSession),
|
||||
}
|
||||
|
||||
/// Returns true if the topic name given is a known topic that we can deserialize and action on.
|
||||
/// Returns false otherwise.
|
||||
fn known_topic(s: &str) -> bool {
|
||||
// Message to contributors: Make sure to align these topic names with the enum above.
|
||||
matches!(
|
||||
s,
|
||||
"/allowed_ips_updated"
|
||||
| "/block_public_or_vpc_access_updated"
|
||||
| "/allowed_vpc_endpoints_updated_for_org"
|
||||
| "/allowed_vpc_endpoints_updated_for_projects"
|
||||
| "/password_updated"
|
||||
| "/cancel_session"
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedIpsUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
@@ -96,6 +118,7 @@ pub(crate) struct PasswordUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
role_name: RoleNameInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct CancelSession {
|
||||
pub(crate) region_id: Option<String>,
|
||||
@@ -141,18 +164,23 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
region_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn increment_active_listeners(&self) {
|
||||
self.cache.increment_active_listeners().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn decrement_active_listeners(&self) {
|
||||
self.cache.decrement_active_listeners().await;
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
|
||||
async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> {
|
||||
let payload: String = msg.get_payload()?;
|
||||
tracing::debug!(?payload, "received a message payload");
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
// For better error handling, we first parse the payload to extract the topic.
|
||||
// If there's a topic we don't support, we can handle that error more gracefully.
|
||||
let header: NotificationHeader = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
@@ -162,6 +190,24 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if !known_topic(header.topic) {
|
||||
// don't update the metric for redis errors if it's just a topic we don't know about.
|
||||
tracing::warn!(topic = header.topic, "unknown topic");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!(topic = header.topic, "broken message: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(?msg, "received a message");
|
||||
match msg {
|
||||
Notification::Cancel(cancel_session) => {
|
||||
|
||||
Reference in New Issue
Block a user