mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
proxy: Parse Notification twice only for unknown topic (#10296)
## Problem We currently parse Notification twice even in the happy path. ## Summary of changes Use `#[serde(other)]` to catch unknown topics and defer the second parsing.
This commit is contained in:
@@ -37,7 +37,6 @@ struct NotificationHeader<'a> {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[serde(tag = "topic", content = "data")]
|
||||
// Message to contributors: Make sure to align these topic names with the list below.
|
||||
pub(crate) enum Notification {
|
||||
#[serde(
|
||||
rename = "/allowed_ips_updated",
|
||||
@@ -74,21 +73,9 @@ pub(crate) enum Notification {
|
||||
PasswordUpdate { password_update: PasswordUpdate },
|
||||
#[serde(rename = "/cancel_session")]
|
||||
Cancel(CancelSession),
|
||||
}
|
||||
|
||||
/// Returns true if the topic name given is a known topic that we can deserialize and action on.
|
||||
/// Returns false otherwise.
|
||||
fn known_topic(s: &str) -> bool {
|
||||
// Message to contributors: Make sure to align these topic names with the enum above.
|
||||
matches!(
|
||||
s,
|
||||
"/allowed_ips_updated"
|
||||
| "/block_public_or_vpc_access_updated"
|
||||
| "/allowed_vpc_endpoints_updated_for_org"
|
||||
| "/allowed_vpc_endpoints_updated_for_projects"
|
||||
| "/password_updated"
|
||||
| "/cancel_session"
|
||||
)
|
||||
#[serde(other, skip_serializing)]
|
||||
UnknownTopic,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
@@ -178,32 +165,29 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
let payload: String = msg.get_payload()?;
|
||||
tracing::debug!(?payload, "received a message payload");
|
||||
|
||||
// For better error handling, we first parse the payload to extract the topic.
|
||||
// If there's a topic we don't support, we can handle that error more gracefully.
|
||||
let header: NotificationHeader = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!("broken message: {e}");
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(Notification::UnknownTopic) => {
|
||||
match serde_json::from_str::<NotificationHeader>(&payload) {
|
||||
// don't update the metric for redis errors if it's just a topic we don't know about.
|
||||
Ok(header) => tracing::warn!(topic = header.topic, "unknown topic"),
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!("broken message: {e}");
|
||||
}
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if !known_topic(header.topic) {
|
||||
// don't update the metric for redis errors if it's just a topic we don't know about.
|
||||
tracing::warn!(topic = header.topic, "unknown topic");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!(topic = header.topic, "broken message: {e}");
|
||||
match serde_json::from_str::<NotificationHeader>(&payload) {
|
||||
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
|
||||
Err(_) => tracing::error!("broken message: {e}"),
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
@@ -278,6 +262,8 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
invalidate_cache(cache, msg);
|
||||
});
|
||||
}
|
||||
|
||||
Notification::UnknownTopic => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -304,6 +290,7 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
|
||||
// https://github.com/neondatabase/neon/pull/10073
|
||||
}
|
||||
Notification::UnknownTopic => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user