From 462713802de5a10d45d25ea5563f6b4f1959dce3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 20 Apr 2024 15:06:36 +0100 Subject: [PATCH] one more --- proxy/src/config.rs | 116 ++++++++++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 655080e9a7..74b49492d3 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -3,7 +3,8 @@ use crate::{ rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{ensure, Context}; +use humantime::parse_duration; use itertools::Itertools; use remote_storage::RemoteStorageConfig; use rustls::{ @@ -342,45 +343,88 @@ impl EndpointCacheConfig { /// Notice that by default the limiter is empty, which means that cache is disabled. pub const CACHE_DEFAULT_OPTIONS: &'static str = "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; +} - /// Parse cache options passed via cmdline. - /// Example: [`Self::CACHE_DEFAULT_OPTIONS`]. - fn parse(options: &str) -> anyhow::Result { - let mut initial_batch_size = None; - let mut default_batch_size = None; - let mut xread_timeout = None; - let mut stream_name = None; - let mut limiter_info = vec![]; - let mut disable_cache = false; - let mut retry_interval = None; +impl<'de> serde::Deserialize<'de> for EndpointCacheConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = EndpointCacheConfig; + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("struct EndpointCacheConfig") + } - for option in options.split(',') { - let (key, value) = option - .split_once('=') - .with_context(|| format!("bad key-value pair: {option}"))?; + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + fn e(t: T) -> E { + E::custom(t) + } - match key { - "initial_batch_size" => initial_batch_size = Some(value.parse()?), - "default_batch_size" => default_batch_size = Some(value.parse()?), - "xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?), - "stream_name" => stream_name = Some(value.to_string()), - "limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?), - "disable_cache" => disable_cache = value.parse()?, - "retry_interval" => retry_interval = Some(humantime::parse_duration(value)?), - unknown => bail!("unknown key: {unknown}"), + let mut initial_batch_size: Option = None; + let mut default_batch_size: Option = None; + let mut xread_timeout: Option = None; + let mut stream_name: Option = None; + let mut limiter_info: Vec = vec![]; + let mut disable_cache: bool = false; + let mut retry_interval: Option = None; + while let Some((k, v)) = map.next_entry::<&str, &str>()? { + match k { + "initial_batch_size" => initial_batch_size = Some(v.parse().map_err(e)?), + "default_batch_size" => default_batch_size = Some(v.parse().map_err(e)?), + "xread_timeout" => { + xread_timeout = Some(parse_duration(v).map_err(e)?); + } + "stream_name" => stream_name = Some(v.to_owned()), + "limiter_info" => limiter_info.push(v.parse().map_err(e)?), + "disable_cache" => disable_cache = v.parse().map_err(e)?, + "retry_interval" => retry_interval = Some(parse_duration(v).map_err(e)?), + x => { + return Err(serde::de::Error::unknown_field( + x, + &[ + "initial_batch_size", + "default_batch_size", + "xread_timeout", + "stream_name", + "limiter_info", + "disable_cache", + "retry_interval", + ], + )); + } + } + } + + let initial_batch_size = initial_batch_size + .ok_or_else(|| serde::de::Error::missing_field("initial_batch_size"))?; + let default_batch_size = default_batch_size + .ok_or_else(|| serde::de::Error::missing_field("default_batch_size"))?; + let xread_timeout = xread_timeout + .ok_or_else(|| serde::de::Error::missing_field("xread_timeout"))?; + let stream_name = + stream_name.ok_or_else(|| serde::de::Error::missing_field("stream_name"))?; + let retry_interval = retry_interval + .ok_or_else(|| serde::de::Error::missing_field("retry_interval"))?; + + RateBucketInfo::validate(&mut limiter_info).map_err(e)?; + + Ok(EndpointCacheConfig { + initial_batch_size, + default_batch_size, + xread_timeout, + stream_name, + limiter_info, + disable_cache, + retry_interval, + }) } } - RateBucketInfo::validate(&mut limiter_info)?; - - Ok(Self { - initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?, - default_batch_size: default_batch_size.context("missing `default_batch_size`")?, - xread_timeout: xread_timeout.context("missing `xread_timeout`")?, - stream_name: stream_name.context("missing `stream_name`")?, - disable_cache, - limiter_info, - retry_interval: retry_interval.context("missing `retry_interval`")?, - }) + serde::Deserializer::deserialize_map(deserializer, Visitor) } } @@ -389,7 +433,7 @@ impl FromStr for EndpointCacheConfig { fn from_str(options: &str) -> Result { let error = || format!("failed to parse endpoint cache options '{options}'"); - Self::parse(options).with_context(error) + Self::deserialize(SimpleKVConfig(options)).with_context(error) } } #[derive(Debug)]