This commit is contained in:
Conrad Ludgate
2024-04-20 15:06:36 +01:00
parent 0b2e0d8af5
commit 462713802d

View File

@@ -3,7 +3,8 @@ use crate::{
rate_limiter::RateBucketInfo,
serverless::GlobalConnPoolOptions,
};
use anyhow::{bail, ensure, Context};
use anyhow::{ensure, Context};
use humantime::parse_duration;
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::{
@@ -342,45 +343,88 @@ impl EndpointCacheConfig {
/// Notice that by default the limiter is empty, which means that cache is disabled.
pub const CACHE_DEFAULT_OPTIONS: &'static str =
"initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s";
}
/// Parse cache options passed via cmdline.
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
fn parse(options: &str) -> anyhow::Result<Self> {
let mut initial_batch_size = None;
let mut default_batch_size = None;
let mut xread_timeout = None;
let mut stream_name = None;
let mut limiter_info = vec![];
let mut disable_cache = false;
let mut retry_interval = None;
impl<'de> serde::Deserialize<'de> for EndpointCacheConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = EndpointCacheConfig;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("struct EndpointCacheConfig")
}
for option in options.split(',') {
let (key, value) = option
.split_once('=')
.with_context(|| format!("bad key-value pair: {option}"))?;
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
fn e<E: serde::de::Error, T: std::fmt::Display>(t: T) -> E {
E::custom(t)
}
match key {
"initial_batch_size" => initial_batch_size = Some(value.parse()?),
"default_batch_size" => default_batch_size = Some(value.parse()?),
"xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?),
"stream_name" => stream_name = Some(value.to_string()),
"limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?),
"disable_cache" => disable_cache = value.parse()?,
"retry_interval" => retry_interval = Some(humantime::parse_duration(value)?),
unknown => bail!("unknown key: {unknown}"),
let mut initial_batch_size: Option<usize> = None;
let mut default_batch_size: Option<usize> = None;
let mut xread_timeout: Option<Duration> = None;
let mut stream_name: Option<String> = None;
let mut limiter_info: Vec<RateBucketInfo> = vec![];
let mut disable_cache: bool = false;
let mut retry_interval: Option<Duration> = None;
while let Some((k, v)) = map.next_entry::<&str, &str>()? {
match k {
"initial_batch_size" => initial_batch_size = Some(v.parse().map_err(e)?),
"default_batch_size" => default_batch_size = Some(v.parse().map_err(e)?),
"xread_timeout" => {
xread_timeout = Some(parse_duration(v).map_err(e)?);
}
"stream_name" => stream_name = Some(v.to_owned()),
"limiter_info" => limiter_info.push(v.parse().map_err(e)?),
"disable_cache" => disable_cache = v.parse().map_err(e)?,
"retry_interval" => retry_interval = Some(parse_duration(v).map_err(e)?),
x => {
return Err(serde::de::Error::unknown_field(
x,
&[
"initial_batch_size",
"default_batch_size",
"xread_timeout",
"stream_name",
"limiter_info",
"disable_cache",
"retry_interval",
],
));
}
}
}
let initial_batch_size = initial_batch_size
.ok_or_else(|| serde::de::Error::missing_field("initial_batch_size"))?;
let default_batch_size = default_batch_size
.ok_or_else(|| serde::de::Error::missing_field("default_batch_size"))?;
let xread_timeout = xread_timeout
.ok_or_else(|| serde::de::Error::missing_field("xread_timeout"))?;
let stream_name =
stream_name.ok_or_else(|| serde::de::Error::missing_field("stream_name"))?;
let retry_interval = retry_interval
.ok_or_else(|| serde::de::Error::missing_field("retry_interval"))?;
RateBucketInfo::validate(&mut limiter_info).map_err(e)?;
Ok(EndpointCacheConfig {
initial_batch_size,
default_batch_size,
xread_timeout,
stream_name,
limiter_info,
disable_cache,
retry_interval,
})
}
}
RateBucketInfo::validate(&mut limiter_info)?;
Ok(Self {
initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?,
default_batch_size: default_batch_size.context("missing `default_batch_size`")?,
xread_timeout: xread_timeout.context("missing `xread_timeout`")?,
stream_name: stream_name.context("missing `stream_name`")?,
disable_cache,
limiter_info,
retry_interval: retry_interval.context("missing `retry_interval`")?,
})
serde::Deserializer::deserialize_map(deserializer, Visitor)
}
}
@@ -389,7 +433,7 @@ impl FromStr for EndpointCacheConfig {
fn from_str(options: &str) -> Result<Self, Self::Err> {
let error = || format!("failed to parse endpoint cache options '{options}'");
Self::parse(options).with_context(error)
Self::deserialize(SimpleKVConfig(options)).with_context(error)
}
}
#[derive(Debug)]