Simplify etcd key regex parsing

This commit is contained in:
Kirill Bulatov
2022-06-02 21:39:13 +03:00
committed by Kirill Bulatov
parent 1d16ee92d4
commit 5b06599770
5 changed files with 38 additions and 67 deletions

1
Cargo.lock generated
View File

@@ -811,6 +811,7 @@ name = "etcd_broker"
version = "0.1.0"
dependencies = [
"etcd-client",
"once_cell",
"regex",
"serde",
"serde_json",

View File

@@ -9,6 +9,7 @@
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.12.0"
once_cell = "1.8.0"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,6 +6,7 @@ use std::{
str::FromStr,
};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
@@ -136,29 +137,6 @@ impl SkTimelineSubscriptionKind {
}
}
fn watch_regex(&self) -> Regex {
match self.kind {
SubscriptionKind::All => Regex::new(&format!(
r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'everything' subscription"),
SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!(
r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'tenant' subscription"),
SubscriptionKind::Timeline(ZTenantTimelineId {
tenant_id,
timeline_id,
}) => Regex::new(&format!(
r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'timeline' subscription"),
}
}
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
match self.kind {
@@ -196,6 +174,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
subscription: SkTimelineSubscriptionKind,
) -> Result<SkTimelineSubscription, BrokerError> {
info!("Subscribing to timeline updates, subscription kind: {subscription:?}");
let kind = subscription.clone();
let (watcher, mut stream) = client
.watch(
@@ -211,12 +190,9 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
})?;
let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel();
let subscription_kind = subscription.kind;
let regex = subscription.watch_regex();
let watcher_handle = tokio::spawn(async move {
while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!(
"Failed to get messages from the subscription stream, kind: {subscription_kind:?}, error: {e}"
"Failed to get messages from the subscription stream, kind: {:?}, error: {e}", subscription.kind
)))? {
if resp.canceled() {
info!("Watch for timeline updates subscription was canceled, exiting");
@@ -245,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
},
};
match parse_etcd_key_value(subscription_kind, &regex, key_str, value_str) {
match parse_etcd_key_value(&subscription, key_str, value_str) {
Ok((zttid, timeline)) => {
match timeline_updates
.entry(zttid)
@@ -283,7 +259,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
}.instrument(info_span!("etcd_broker")));
Ok(SkTimelineSubscription {
kind: subscription,
kind,
safekeeper_timeline_updates,
watcher_handle,
watcher,
@@ -300,17 +276,30 @@ fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> {
Ok((key, value))
}
static SK_TIMELINE_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$")
.expect("wrong regex for safekeeper timeline etcd key")
});
fn parse_etcd_key_value(
subscription_kind: SubscriptionKind,
regex: &Regex,
subscription: &SkTimelineSubscriptionKind,
key_str: &str,
value_str: &str,
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
let key_captures = match regex.captures(key_str) {
let broker_prefix = subscription.broker_etcd_prefix.as_str();
if !key_str.starts_with(broker_prefix) {
return Err(BrokerError::ParsingError(format!(
"KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}"
)));
}
let key_part = &key_str[broker_prefix.len()..];
let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) {
Some(captures) => captures,
None => {
return Err(BrokerError::ParsingError(format!(
"KV has unexpected key '{key_str}' that does not match required regex {regex}"
"KV has unexpected key part '{key_part}' that does not match required regex {}",
SK_TIMELINE_KEY_REGEX.as_str()
)));
}
};
@@ -320,26 +309,11 @@ fn parse_etcd_key_value(
))
})?;
let (zttid, safekeeper_id) = match subscription_kind {
SubscriptionKind::All => (
ZTenantTimelineId::new(
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?,
),
NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Tenant(tenant_id) => (
ZTenantTimelineId::new(
tenant_id,
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
),
NodeId(parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Timeline(zttid) => (
zttid,
NodeId(parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?),
),
};
let zttid = ZTenantTimelineId::new(
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?,
);
let safekeeper_id = NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?);
Ok((
zttid,
@@ -408,9 +382,8 @@ mod tests {
&tenant_subscription,
&timeline_subscription,
] {
let watch_regex = subscription.watch_regex();
let (id, _timeline) =
parse_etcd_key_value(subscription.kind, &watch_regex, &key_string, value_str)
parse_etcd_key_value(subscription, &key_string, value_str)
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
}

View File

@@ -144,19 +144,15 @@ async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
}
pub fn get_campaign_name(
election_name: String,
broker_prefix: String,
timeline_id: &ZTenantTimelineId,
election_name: &str,
broker_prefix: &str,
id: ZTenantTimelineId,
) -> String {
return format!(
"{}/{}",
SkTimelineSubscriptionKind::timeline(broker_prefix, *timeline_id).watch_key(),
election_name
);
format!("{broker_prefix}/{id}/{election_name}")
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{}", system_id)
format!("id_{system_id}")
}
/// Push once in a while data about all active timelines to the broker.

View File

@@ -99,9 +99,9 @@ async fn wal_backup_launcher_main_loop(
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = broker::get_campaign_name(
BACKUP_ELECTION_NAME.to_string(),
conf.broker_etcd_prefix.clone(),
&zttid,
BACKUP_ELECTION_NAME,
&conf.broker_etcd_prefix,
zttid,
);
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(