From 7933804284f42204819c65543e181d99444c4df4 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 2 Jun 2022 13:28:34 +0300 Subject: [PATCH] Fix and test regex parsing --- libs/etcd_broker/src/lib.rs | 129 +++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 9184412eb1..c7777c207f 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -139,12 +139,12 @@ impl SkTimelineSubscriptionKind { fn watch_regex(&self) -> Regex { match self.kind { SubscriptionKind::All => Regex::new(&format!( - r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", + r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$", self.broker_etcd_prefix )) .expect("wrong regex for 'everything' subscription"), SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!( - r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", + r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$", self.broker_etcd_prefix )) .expect("wrong regex for 'tenant' subscription"), @@ -152,7 +152,7 @@ impl SkTimelineSubscriptionKind { tenant_id, timeline_id, }) => Regex::new(&format!( - r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$", + r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]]+)$", self.broker_etcd_prefix )) .expect("wrong regex for 'timeline' subscription"), @@ -237,9 +237,16 @@ pub async fn subscribe_to_safekeeper_timeline_updates( if EventType::Put == event.event_type() { if let Some(new_etcd_kv) = event.kv() { let new_kv_version = new_etcd_kv.version(); + let (key_str, value_str) = match extract_key_value_str(new_etcd_kv) { + Ok(strs) => strs, + Err(e) => { + error!("Failed to represent etcd KV {new_etcd_kv:?} as pair of str: {e}"); + continue; + }, + }; - match parse_etcd_key_value(subscription_kind, ®ex, new_etcd_kv) { - Ok(Some((zttid, timeline))) => { + match parse_etcd_key_value(subscription_kind, ®ex, key_str, value_str) { + Ok((zttid, timeline)) => { match timeline_updates .entry(zttid) .or_default() @@ -250,6 +257,8 @@ pub async fn subscribe_to_safekeeper_timeline_updates( if old_etcd_kv_version < new_kv_version { o.insert(timeline.info); timeline_etcd_versions.insert(zttid,new_kv_version); + } else { + debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); } } hash_map::Entry::Vacant(v) => { @@ -258,7 +267,6 @@ pub async fn subscribe_to_safekeeper_timeline_updates( } } } - Ok(None) => warn!("Ignoring etcd KV with unexpected key {:?} that does not match required regex {}", new_etcd_kv.key_str(), regex), Err(e) => error!("Failed to parse timeline update: {e}"), }; } @@ -282,54 +290,64 @@ pub async fn subscribe_to_safekeeper_timeline_updates( }) } +fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> { + let key = kv.key_str().map_err(|e| { + BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string()) + })?; + let value = kv.value_str().map_err(|e| { + BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string()) + })?; + Ok((key, value)) +} + fn parse_etcd_key_value( subscription_kind: SubscriptionKind, regex: &Regex, - kv: &KeyValue, -) -> Result, BrokerError> { - let caps = if let Some(caps) = regex.captures(kv.key_str().map_err(|e| { - BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as key str")) - })?) { - caps - } else { - return Ok(None); + key_str: &str, + value_str: &str, +) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> { + let key_captures = match regex.captures(key_str) { + Some(captures) => captures, + None => { + return Err(BrokerError::ParsingError(format!( + "KV has unexpected key '{key_str}' that does not match required regex {regex}" + ))); + } }; + let info = serde_json::from_str(value_str).map_err(|e| { + BrokerError::ParsingError(format!( + "Failed to parse '{value_str}' as safekeeper timeline info: {e}" + )) + })?; let (zttid, safekeeper_id) = match subscription_kind { SubscriptionKind::All => ( ZTenantTimelineId::new( - parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?, - parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?, + parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, + parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?, ), - NodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?), + NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?), ), SubscriptionKind::Tenant(tenant_id) => ( ZTenantTimelineId::new( tenant_id, - parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?, + parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, ), - NodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?), + NodeId(parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?), ), SubscriptionKind::Timeline(zttid) => ( zttid, - NodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?), + NodeId(parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?), ), }; - let info_str = kv.value_str().map_err(|e| { - BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as value str")) - })?; - Ok(Some(( + Ok(( zttid, SafekeeperTimeline { safekeeper_id, - info: serde_json::from_str(info_str).map_err(|e| { - BrokerError::ParsingError(format!( - "Failed to parse '{info_str}' as safekeeper timeline info: {e}" - )) - })?, + info, }, - ))) + )) } fn parse_capture(caps: &Captures, index: usize) -> Result @@ -348,3 +366,54 @@ where ) }) } + +#[cfg(test)] +mod tests { + use utils::zid::ZTimelineId; + + use super::*; + + #[test] + fn typical_etcd_prefix_should_be_parsed() { + let prefix = "neon"; + let tenant_id = ZTenantId::generate(); + let timeline_id = ZTimelineId::generate(); + let all_subscription = SkTimelineSubscriptionKind { + broker_etcd_prefix: prefix.to_string(), + kind: SubscriptionKind::All, + }; + let tenant_subscription = SkTimelineSubscriptionKind { + broker_etcd_prefix: prefix.to_string(), + kind: SubscriptionKind::Tenant(tenant_id), + }; + let timeline_subscription = SkTimelineSubscriptionKind { + broker_etcd_prefix: prefix.to_string(), + kind: SubscriptionKind::Timeline(ZTenantTimelineId::new(tenant_id, timeline_id)), + }; + + let typical_etcd_kv_strs = [ + ( + format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/1"), + r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, + ), + ( + format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/13"), + r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, + ), + ]; + + for (key_string, value_str) in typical_etcd_kv_strs { + for subscription in [ + &all_subscription, + &tenant_subscription, + &timeline_subscription, + ] { + let watch_regex = subscription.watch_regex(); + let (id, _timeline) = + parse_etcd_key_value(subscription.kind, &watch_regex, &key_string, value_str) + .unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}")); + assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id)); + } + } + } +}