Fix and test regex parsing

This commit is contained in:
Kirill Bulatov
2022-06-02 13:28:34 +03:00
committed by Kirill Bulatov
parent a91e0c299d
commit 7933804284

View File

@@ -139,12 +139,12 @@ impl SkTimelineSubscriptionKind {
fn watch_regex(&self) -> Regex {
match self.kind {
SubscriptionKind::All => Regex::new(&format!(
r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'everything' subscription"),
SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!(
r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'tenant' subscription"),
@@ -152,7 +152,7 @@ impl SkTimelineSubscriptionKind {
tenant_id,
timeline_id,
}) => Regex::new(&format!(
r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$",
r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]]+)$",
self.broker_etcd_prefix
))
.expect("wrong regex for 'timeline' subscription"),
@@ -237,9 +237,16 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
let (key_str, value_str) = match extract_key_value_str(new_etcd_kv) {
Ok(strs) => strs,
Err(e) => {
error!("Failed to represent etcd KV {new_etcd_kv:?} as pair of str: {e}");
continue;
},
};
match parse_etcd_key_value(subscription_kind, &regex, new_etcd_kv) {
Ok(Some((zttid, timeline))) => {
match parse_etcd_key_value(subscription_kind, &regex, key_str, value_str) {
Ok((zttid, timeline)) => {
match timeline_updates
.entry(zttid)
.or_default()
@@ -250,6 +257,8 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
if old_etcd_kv_version < new_kv_version {
o.insert(timeline.info);
timeline_etcd_versions.insert(zttid,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
}
}
hash_map::Entry::Vacant(v) => {
@@ -258,7 +267,6 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
}
}
}
Ok(None) => warn!("Ignoring etcd KV with unexpected key {:?} that does not match required regex {}", new_etcd_kv.key_str(), regex),
Err(e) => error!("Failed to parse timeline update: {e}"),
};
}
@@ -282,54 +290,64 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
})
}
fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> {
let key = kv.key_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string())
})?;
let value = kv.value_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string())
})?;
Ok((key, value))
}
fn parse_etcd_key_value(
subscription_kind: SubscriptionKind,
regex: &Regex,
kv: &KeyValue,
) -> Result<Option<(ZTenantTimelineId, SafekeeperTimeline)>, BrokerError> {
let caps = if let Some(caps) = regex.captures(kv.key_str().map_err(|e| {
BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as key str"))
})?) {
caps
} else {
return Ok(None);
key_str: &str,
value_str: &str,
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
let key_captures = match regex.captures(key_str) {
Some(captures) => captures,
None => {
return Err(BrokerError::ParsingError(format!(
"KV has unexpected key '{key_str}' that does not match required regex {regex}"
)));
}
};
let info = serde_json::from_str(value_str).map_err(|e| {
BrokerError::ParsingError(format!(
"Failed to parse '{value_str}' as safekeeper timeline info: {e}"
))
})?;
let (zttid, safekeeper_id) = match subscription_kind {
SubscriptionKind::All => (
ZTenantTimelineId::new(
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?,
),
NodeId(parse_capture(&caps, 3).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Tenant(tenant_id) => (
ZTenantTimelineId::new(
tenant_id,
parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?,
parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?,
),
NodeId(parse_capture(&caps, 2).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?),
),
SubscriptionKind::Timeline(zttid) => (
zttid,
NodeId(parse_capture(&caps, 1).map_err(BrokerError::ParsingError)?),
NodeId(parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?),
),
};
let info_str = kv.value_str().map_err(|e| {
BrokerError::EtcdClient(e, format!("Failed to represent kv {kv:?} as value str"))
})?;
Ok(Some((
Ok((
zttid,
SafekeeperTimeline {
safekeeper_id,
info: serde_json::from_str(info_str).map_err(|e| {
BrokerError::ParsingError(format!(
"Failed to parse '{info_str}' as safekeeper timeline info: {e}"
))
})?,
info,
},
)))
))
}
fn parse_capture<T>(caps: &Captures, index: usize) -> Result<T, String>
@@ -348,3 +366,54 @@ where
)
})
}
#[cfg(test)]
mod tests {
use utils::zid::ZTimelineId;
use super::*;
#[test]
fn typical_etcd_prefix_should_be_parsed() {
let prefix = "neon";
let tenant_id = ZTenantId::generate();
let timeline_id = ZTimelineId::generate();
let all_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::All,
};
let tenant_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::Tenant(tenant_id),
};
let timeline_subscription = SkTimelineSubscriptionKind {
broker_etcd_prefix: prefix.to_string(),
kind: SubscriptionKind::Timeline(ZTenantTimelineId::new(tenant_id, timeline_id)),
};
let typical_etcd_kv_strs = [
(
format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/1"),
r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#,
),
(
format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/13"),
r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#,
),
];
for (key_string, value_str) in typical_etcd_kv_strs {
for subscription in [
&all_subscription,
&tenant_subscription,
&timeline_subscription,
] {
let watch_regex = subscription.watch_regex();
let (id, _timeline) =
parse_etcd_key_value(subscription.kind, &watch_regex, &key_string, value_str)
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
}
}
}
}