diff --git a/Cargo.lock b/Cargo.lock index 8f62994542..150b721576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1937,6 +1937,7 @@ dependencies = [ "async-stream", "async-trait", "backon", + "common-base", "common-error", "common-macro", "common-runtime", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index b9adcf2152..472a57e40a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -29,6 +29,12 @@ store_key_prefix = "" max_retry_times = 12 # Initial retry delay of procedures, increases exponentially retry_delay = "500ms" +# Auto split large value +# GreptimeDB procedure uses etcd as the default metadata storage backend. +# The etcd the maximum size of any request is 1.5 MiB +# 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key) +# Comments out the `max_metadata_value_size`, for don't split large value (no limit). +max_metadata_value_size = "1500KiB" # Failure detectors options. [failure_detector] diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 0e45ffa461..c7affc93c6 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -218,6 +218,7 @@ impl StartCommand { mod tests { use std::io::Write; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use meta_srv::selector::SelectorType; @@ -297,6 +298,10 @@ mod tests { .first_heartbeat_estimate .as_millis() ); + assert_eq!( + options.procedure.max_metadata_value_size, + Some(ReadableSize::kb(1500)) + ); } #[test] diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 686e7477ce..eb3a91de17 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -45,21 +45,30 @@ fn strip_prefix(key: &str) -> String { pub struct KvStateStore { kv_backend: KvBackendRef, // The max num of keys to be returned in a range scan request - // `None` stands no limit. - max_num_per_range: Option, + // `None` stands for no limit. + max_num_per_range_request: Option, // The max bytes of value. - // `None` stands no limit. - max_size_per_value: Option, + // `None` stands for no limit. + max_value_size: Option, } impl KvStateStore { pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend, - max_num_per_range: None, - max_size_per_value: None, + max_num_per_range_request: None, + max_value_size: None, } } + + /// Sets the `max_value_size`. `None` stands for no limit. + /// + /// If a value is larger than the `max_value_size`, + /// the [`KvStateStore`] will automatically split the large value into multiple values. + pub fn with_max_value_size(mut self, max_value_size: Option) -> Self { + self.max_value_size = max_value_size; + self + } } fn decode_kv(kv: KeyValue) -> Result<(String, Vec)> { @@ -75,12 +84,12 @@ enum SplitValue<'a> { Multiple(Vec<&'a [u8]>), } -fn split_value(value: &[u8], max_size_per_value: Option) -> SplitValue<'_> { - if let Some(max_size_per_value) = max_size_per_value { - if value.len() <= max_size_per_value { +fn split_value(value: &[u8], max_value_size: Option) -> SplitValue<'_> { + if let Some(max_value_size) = max_value_size { + if value.len() <= max_value_size { SplitValue::Single(value) } else { - SplitValue::Multiple(value.chunks(max_size_per_value).collect::>()) + SplitValue::Multiple(value.chunks(max_value_size).collect::>()) } } else { SplitValue::Single(value) @@ -90,7 +99,7 @@ fn split_value(value: &[u8], max_size_per_value: Option) -> SplitValue<'_ #[async_trait] impl StateStore for KvStateStore { async fn put(&self, key: &str, value: Vec) -> ProcedureResult<()> { - let split = split_value(&value, self.max_size_per_value); + let split = split_value(&value, self.max_value_size); let key = with_prefix(key); match split { SplitValue::Single(_) => { @@ -156,7 +165,7 @@ impl StateStore for KvStateStore { let stream = PaginationStream::new( self.kv_backend.clone(), req, - self.max_num_per_range.unwrap_or_default(), + self.max_num_per_range_request.unwrap_or_default(), Arc::new(decode_kv), ); @@ -214,8 +223,8 @@ mod tests { async fn test_meta_state_store() { let store = &KvStateStore { kv_backend: Arc::new(MemoryKvBackend::new()), - max_num_per_range: Some(1), // for testing "more" in range - max_size_per_value: None, + max_num_per_range_request: Some(1), // for testing "more" in range + max_value_size: None, }; let walk_top_down = async move |path: &str| -> Vec { @@ -294,8 +303,8 @@ mod tests { } let store = &KvStateStore { kv_backend: kv_backend.clone(), - max_num_per_range: Some(num_per_range as usize), // for testing "more" in range - max_size_per_value: Some(size_limit as usize), + max_num_per_range_request: Some(num_per_range as usize), // for testing "more" in range + max_value_size: Some(size_limit as usize), }; let walk_top_down = async move |path: &str| -> Vec { let mut data = store @@ -366,11 +375,11 @@ mod tests { Arc::new(ChrootKvBackend::new(chroot.into(), backend)) }; - let key_preserve_size = 1024; + let key_size = 1024; // The etcd default size limit of any requests is 1.5MiB. // However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`; // we don't know the exact size of the key. - let size_limit = 1536 * 1024 - key_preserve_size; + let size_limit = 1536 * 1024 - key_size; let page_size = rand::thread_rng().gen_range(1..10); test_meta_state_store_split_value_with_size_limit( kv_backend, diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index af755b2b87..d5b4767774 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -14,6 +14,7 @@ workspace = true async-stream.workspace = true async-trait.workspace = true backon = "0.4" +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true diff --git a/src/common/procedure/src/options.rs b/src/common/procedure/src/options.rs index 9cebfa805c..503812de49 100644 --- a/src/common/procedure/src/options.rs +++ b/src/common/procedure/src/options.rs @@ -16,6 +16,7 @@ use std::time::Duration; +use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -26,6 +27,8 @@ pub struct ProcedureConfig { /// Initial retry delay of procedures, increases exponentially. #[serde(with = "humantime_serde")] pub retry_delay: Duration, + /// `None` stands for no limit. + pub max_metadata_value_size: Option, } impl Default for ProcedureConfig { @@ -33,6 +36,7 @@ impl Default for ProcedureConfig { ProcedureConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + max_metadata_value_size: None, } } } diff --git a/src/common/procedure/src/store/util.rs b/src/common/procedure/src/store/util.rs index 924a3b069b..c8ed3bf50f 100644 --- a/src/common/procedure/src/store/util.rs +++ b/src/common/procedure/src/store/util.rs @@ -22,7 +22,7 @@ use super::state_store::KeySet; use crate::error; use crate::error::Result; -pub struct CollectingState { +struct CollectingState { pairs: Vec<(String, Vec)>, } @@ -40,11 +40,11 @@ fn parse_segments(segments: Vec<(String, Vec)>, prefix: &str) -> Result>>() } -/// Collects multiple values into a single key-value pair. +/// Merges multiple values into a single key-value pair. /// Returns an error if: /// - Part values are lost. /// - Failed to parse the key of segment. -fn multiple_values_collector( +fn merge_multiple_values( CollectingState { mut pairs }: CollectingState, ) -> Result<(KeySet, Vec)> { if pairs.len() == 1 { @@ -100,8 +100,10 @@ impl CollectingState { } } -pub type Upstream = dyn Stream)>> + Send; +type Upstream = dyn Stream)>> + Send; +/// Merges multiple values that have the same prefix of the key +/// from `upstream` into a single value. pub fn multiple_value_stream( mut upstream: Pin>, ) -> impl Stream)>> { @@ -117,14 +119,14 @@ pub fn multiple_value_stream( } else { // Starts to collect next key value pair. collecting = Some(CollectingState::new(key, value)); - yield multiple_values_collector(current)?; + yield merge_multiple_values(current)?; } } None => collecting = Some(CollectingState::new(key, value)), } } if let Some(current) = collecting.take() { - yield multiple_values_collector(current)? + yield merge_multiple_values(current)? } } } @@ -149,7 +151,7 @@ mod tests { } #[tokio::test] - async fn test_multiple_values_collector() { + async fn test_merge_multiple_values() { let upstream = stream::iter(vec![ Ok(("foo".to_string(), vec![0, 1, 2, 3])), Ok(("foo/0002".to_string(), vec![6, 7])), @@ -192,7 +194,7 @@ mod tests { } #[tokio::test] - async fn test_multiple_values_collector_err() { + async fn test_multiple_values_stream_err() { let upstream = stream::iter(vec![ Err(error::UnexpectedSnafu { err_msg: "mock" }.build()), Ok(("foo".to_string(), vec![0, 1, 2, 3])), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 68b3579298..fa76903694 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; @@ -115,6 +116,9 @@ impl Default for MetaSrvOptions { procedure: ProcedureConfig { max_retry_times: 12, retry_delay: Duration::from_millis(500), + // The etcd the maximum size of any request is 1.5 MiB + // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key) + max_metadata_value_size: Some(ReadableSize::kb(1500)), }, failure_detector: PhiAccrualFailureDetectorOptions::default(), datanode: DatanodeOptions::default(), diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e7a561f87b..fe327bd589 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -376,8 +376,13 @@ fn build_procedure_manager( retry_delay: options.procedure.retry_delay, ..Default::default() }; - let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - Arc::new(LocalManager::new(manager_config, state_store)) + let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size( + options + .procedure + .max_metadata_value_size + .map(|v| v.as_bytes() as usize), + ); + Arc::new(LocalManager::new(manager_config, Arc::new(state_store))) } fn build_ddl_manager( diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4ca617b2ab..b4eb412856 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -174,6 +174,7 @@ impl GreptimeDbClusterBuilder { // We only make max_retry_times and retry_delay large than the default in tests. max_retry_times: 5, retry_delay: Duration::from_secs(1), + max_metadata_value_size: None, }, wal: self.metasrv_wal_config.clone(), ..Default::default()