From 5b06599770624ecd3184a9670c645cfbebfcfdfb Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 2 Jun 2022 21:39:13 +0300 Subject: [PATCH] Simplify etcd key regex parsing --- Cargo.lock | 1 + libs/etcd_broker/Cargo.toml | 1 + libs/etcd_broker/src/lib.rs | 83 ++++++++++++------------------------ safekeeper/src/broker.rs | 14 +++--- safekeeper/src/wal_backup.rs | 6 +-- 5 files changed, 38 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f8382de27..c615766eb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,7 @@ name = "etcd_broker" version = "0.1.0" dependencies = [ "etcd-client", + "once_cell", "regex", "serde", "serde_json", diff --git a/libs/etcd_broker/Cargo.toml b/libs/etcd_broker/Cargo.toml index 65bd406131..49be7ad207 100644 --- a/libs/etcd_broker/Cargo.toml +++ b/libs/etcd_broker/Cargo.toml @@ -9,6 +9,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" serde_with = "1.12.0" + once_cell = "1.8.0" utils = { path = "../utils" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index c7777c207f..daa9c513c2 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -6,6 +6,7 @@ use std::{ str::FromStr, }; +use once_cell::sync::Lazy; use regex::{Captures, Regex}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -136,29 +137,6 @@ impl SkTimelineSubscriptionKind { } } - fn watch_regex(&self) -> Regex { - match self.kind { - SubscriptionKind::All => Regex::new(&format!( - r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$", - self.broker_etcd_prefix - )) - .expect("wrong regex for 'everything' subscription"), - SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!( - r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$", - self.broker_etcd_prefix - )) - .expect("wrong regex for 'tenant' subscription"), - SubscriptionKind::Timeline(ZTenantTimelineId { - tenant_id, - timeline_id, - }) => Regex::new(&format!( - r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]]+)$", - self.broker_etcd_prefix - )) - .expect("wrong regex for 'timeline' subscription"), - } - } - /// Etcd key to use for watching a certain timeline updates from safekeepers. pub fn watch_key(&self) -> String { match self.kind { @@ -196,6 +174,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates( subscription: SkTimelineSubscriptionKind, ) -> Result { info!("Subscribing to timeline updates, subscription kind: {subscription:?}"); + let kind = subscription.clone(); let (watcher, mut stream) = client .watch( @@ -211,12 +190,9 @@ pub async fn subscribe_to_safekeeper_timeline_updates( })?; let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel(); - - let subscription_kind = subscription.kind; - let regex = subscription.watch_regex(); let watcher_handle = tokio::spawn(async move { while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!( - "Failed to get messages from the subscription stream, kind: {subscription_kind:?}, error: {e}" + "Failed to get messages from the subscription stream, kind: {:?}, error: {e}", subscription.kind )))? { if resp.canceled() { info!("Watch for timeline updates subscription was canceled, exiting"); @@ -245,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates( }, }; - match parse_etcd_key_value(subscription_kind, ®ex, key_str, value_str) { + match parse_etcd_key_value(&subscription, key_str, value_str) { Ok((zttid, timeline)) => { match timeline_updates .entry(zttid) @@ -283,7 +259,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates( }.instrument(info_span!("etcd_broker"))); Ok(SkTimelineSubscription { - kind: subscription, + kind, safekeeper_timeline_updates, watcher_handle, watcher, @@ -300,17 +276,30 @@ fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> { Ok((key, value)) } +static SK_TIMELINE_KEY_REGEX: Lazy = Lazy::new(|| { + Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$") + .expect("wrong regex for safekeeper timeline etcd key") +}); + fn parse_etcd_key_value( - subscription_kind: SubscriptionKind, - regex: &Regex, + subscription: &SkTimelineSubscriptionKind, key_str: &str, value_str: &str, ) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> { - let key_captures = match regex.captures(key_str) { + let broker_prefix = subscription.broker_etcd_prefix.as_str(); + if !key_str.starts_with(broker_prefix) { + return Err(BrokerError::ParsingError(format!( + "KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}" + ))); + } + + let key_part = &key_str[broker_prefix.len()..]; + let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) { Some(captures) => captures, None => { return Err(BrokerError::ParsingError(format!( - "KV has unexpected key '{key_str}' that does not match required regex {regex}" + "KV has unexpected key part '{key_part}' that does not match required regex {}", + SK_TIMELINE_KEY_REGEX.as_str() ))); } }; @@ -320,26 +309,11 @@ fn parse_etcd_key_value( )) })?; - let (zttid, safekeeper_id) = match subscription_kind { - SubscriptionKind::All => ( - ZTenantTimelineId::new( - parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, - parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?, - ), - NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?), - ), - SubscriptionKind::Tenant(tenant_id) => ( - ZTenantTimelineId::new( - tenant_id, - parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, - ), - NodeId(parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?), - ), - SubscriptionKind::Timeline(zttid) => ( - zttid, - NodeId(parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?), - ), - }; + let zttid = ZTenantTimelineId::new( + parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, + parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?, + ); + let safekeeper_id = NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?); Ok(( zttid, @@ -408,9 +382,8 @@ mod tests { &tenant_subscription, &timeline_subscription, ] { - let watch_regex = subscription.watch_regex(); let (id, _timeline) = - parse_etcd_key_value(subscription.kind, &watch_regex, &key_string, value_str) + parse_etcd_key_value(subscription, &key_string, value_str) .unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}")); assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id)); } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 5bcb197205..5be8091a7e 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -144,19 +144,15 @@ async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { } pub fn get_campaign_name( - election_name: String, - broker_prefix: String, - timeline_id: &ZTenantTimelineId, + election_name: &str, + broker_prefix: &str, + id: ZTenantTimelineId, ) -> String { - return format!( - "{}/{}", - SkTimelineSubscriptionKind::timeline(broker_prefix, *timeline_id).watch_key(), - election_name - ); + format!("{broker_prefix}/{id}/{election_name}") } pub fn get_candiate_name(system_id: NodeId) -> String { - format!("id_{}", system_id) + format!("id_{system_id}") } /// Push once in a while data about all active timelines to the broker. diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 1723d03ee3..30364ce434 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -99,9 +99,9 @@ async fn wal_backup_launcher_main_loop( // TODO: decide who should offload in launcher itself by simply checking current state let election_name = broker::get_campaign_name( - BACKUP_ELECTION_NAME.to_string(), - conf.broker_etcd_prefix.clone(), - &zttid, + BACKUP_ELECTION_NAME, + &conf.broker_etcd_prefix, + zttid, ); let my_candidate_name = broker::get_candiate_name(conf.my_id); let election = broker::Election::new(