mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat(procedure): enable auto split large value (#3628)
* chore: add comments * chore: remove `pub` * chore: rename to `merge_multiple_values` * chore: fix typo * feat(procedure): enable auto split large value * chore: apply suggestions from CR * chore: rename to `max_metadata_value_size` * chore: remove the NoneAsEmptyString * chore: set default max_metadata_value_size to 1500KiB
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1937,6 +1937,7 @@ dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"backon",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-runtime",
|
||||
|
||||
@@ -29,6 +29,12 @@ store_key_prefix = ""
|
||||
max_retry_times = 12
|
||||
# Initial retry delay of procedures, increases exponentially
|
||||
retry_delay = "500ms"
|
||||
# Auto split large value
|
||||
# GreptimeDB procedure uses etcd as the default metadata storage backend.
|
||||
# The etcd the maximum size of any request is 1.5 MiB
|
||||
# 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
|
||||
# Comments out the `max_metadata_value_size`, for don't split large value (no limit).
|
||||
max_metadata_value_size = "1500KiB"
|
||||
|
||||
# Failure detectors options.
|
||||
[failure_detector]
|
||||
|
||||
@@ -218,6 +218,7 @@ impl StartCommand {
|
||||
mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_named_temp_file;
|
||||
use meta_srv::selector::SelectorType;
|
||||
|
||||
@@ -297,6 +298,10 @@ mod tests {
|
||||
.first_heartbeat_estimate
|
||||
.as_millis()
|
||||
);
|
||||
assert_eq!(
|
||||
options.procedure.max_metadata_value_size,
|
||||
Some(ReadableSize::kb(1500))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -45,21 +45,30 @@ fn strip_prefix(key: &str) -> String {
|
||||
pub struct KvStateStore {
|
||||
kv_backend: KvBackendRef,
|
||||
// The max num of keys to be returned in a range scan request
|
||||
// `None` stands no limit.
|
||||
max_num_per_range: Option<usize>,
|
||||
// `None` stands for no limit.
|
||||
max_num_per_range_request: Option<usize>,
|
||||
// The max bytes of value.
|
||||
// `None` stands no limit.
|
||||
max_size_per_value: Option<usize>,
|
||||
// `None` stands for no limit.
|
||||
max_value_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl KvStateStore {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
kv_backend,
|
||||
max_num_per_range: None,
|
||||
max_size_per_value: None,
|
||||
max_num_per_range_request: None,
|
||||
max_value_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the `max_value_size`. `None` stands for no limit.
|
||||
///
|
||||
/// If a value is larger than the `max_value_size`,
|
||||
/// the [`KvStateStore`] will automatically split the large value into multiple values.
|
||||
pub fn with_max_value_size(mut self, max_value_size: Option<usize>) -> Self {
|
||||
self.max_value_size = max_value_size;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
|
||||
@@ -75,12 +84,12 @@ enum SplitValue<'a> {
|
||||
Multiple(Vec<&'a [u8]>),
|
||||
}
|
||||
|
||||
fn split_value(value: &[u8], max_size_per_value: Option<usize>) -> SplitValue<'_> {
|
||||
if let Some(max_size_per_value) = max_size_per_value {
|
||||
if value.len() <= max_size_per_value {
|
||||
fn split_value(value: &[u8], max_value_size: Option<usize>) -> SplitValue<'_> {
|
||||
if let Some(max_value_size) = max_value_size {
|
||||
if value.len() <= max_value_size {
|
||||
SplitValue::Single(value)
|
||||
} else {
|
||||
SplitValue::Multiple(value.chunks(max_size_per_value).collect::<Vec<_>>())
|
||||
SplitValue::Multiple(value.chunks(max_value_size).collect::<Vec<_>>())
|
||||
}
|
||||
} else {
|
||||
SplitValue::Single(value)
|
||||
@@ -90,7 +99,7 @@ fn split_value(value: &[u8], max_size_per_value: Option<usize>) -> SplitValue<'_
|
||||
#[async_trait]
|
||||
impl StateStore for KvStateStore {
|
||||
async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
|
||||
let split = split_value(&value, self.max_size_per_value);
|
||||
let split = split_value(&value, self.max_value_size);
|
||||
let key = with_prefix(key);
|
||||
match split {
|
||||
SplitValue::Single(_) => {
|
||||
@@ -156,7 +165,7 @@ impl StateStore for KvStateStore {
|
||||
let stream = PaginationStream::new(
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
self.max_num_per_range.unwrap_or_default(),
|
||||
self.max_num_per_range_request.unwrap_or_default(),
|
||||
Arc::new(decode_kv),
|
||||
);
|
||||
|
||||
@@ -214,8 +223,8 @@ mod tests {
|
||||
async fn test_meta_state_store() {
|
||||
let store = &KvStateStore {
|
||||
kv_backend: Arc::new(MemoryKvBackend::new()),
|
||||
max_num_per_range: Some(1), // for testing "more" in range
|
||||
max_size_per_value: None,
|
||||
max_num_per_range_request: Some(1), // for testing "more" in range
|
||||
max_value_size: None,
|
||||
};
|
||||
|
||||
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
|
||||
@@ -294,8 +303,8 @@ mod tests {
|
||||
}
|
||||
let store = &KvStateStore {
|
||||
kv_backend: kv_backend.clone(),
|
||||
max_num_per_range: Some(num_per_range as usize), // for testing "more" in range
|
||||
max_size_per_value: Some(size_limit as usize),
|
||||
max_num_per_range_request: Some(num_per_range as usize), // for testing "more" in range
|
||||
max_value_size: Some(size_limit as usize),
|
||||
};
|
||||
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
|
||||
let mut data = store
|
||||
@@ -366,11 +375,11 @@ mod tests {
|
||||
Arc::new(ChrootKvBackend::new(chroot.into(), backend))
|
||||
};
|
||||
|
||||
let key_preserve_size = 1024;
|
||||
let key_size = 1024;
|
||||
// The etcd default size limit of any requests is 1.5MiB.
|
||||
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
|
||||
// we don't know the exact size of the key.
|
||||
let size_limit = 1536 * 1024 - key_preserve_size;
|
||||
let size_limit = 1536 * 1024 - key_size;
|
||||
let page_size = rand::thread_rng().gen_range(1..10);
|
||||
test_meta_state_store_split_value_with_size_limit(
|
||||
kv_backend,
|
||||
|
||||
@@ -14,6 +14,7 @@ workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
backon = "0.4"
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-runtime.workspace = true
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -26,6 +27,8 @@ pub struct ProcedureConfig {
|
||||
/// Initial retry delay of procedures, increases exponentially.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub retry_delay: Duration,
|
||||
/// `None` stands for no limit.
|
||||
pub max_metadata_value_size: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl Default for ProcedureConfig {
|
||||
@@ -33,6 +36,7 @@ impl Default for ProcedureConfig {
|
||||
ProcedureConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
max_metadata_value_size: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use super::state_store::KeySet;
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
pub struct CollectingState {
|
||||
struct CollectingState {
|
||||
pairs: Vec<(String, Vec<u8>)>,
|
||||
}
|
||||
|
||||
@@ -40,11 +40,11 @@ fn parse_segments(segments: Vec<(String, Vec<u8>)>, prefix: &str) -> Result<Vec<
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
|
||||
/// Collects multiple values into a single key-value pair.
|
||||
/// Merges multiple values into a single key-value pair.
|
||||
/// Returns an error if:
|
||||
/// - Part values are lost.
|
||||
/// - Failed to parse the key of segment.
|
||||
fn multiple_values_collector(
|
||||
fn merge_multiple_values(
|
||||
CollectingState { mut pairs }: CollectingState,
|
||||
) -> Result<(KeySet, Vec<u8>)> {
|
||||
if pairs.len() == 1 {
|
||||
@@ -100,8 +100,10 @@ impl CollectingState {
|
||||
}
|
||||
}
|
||||
|
||||
pub type Upstream = dyn Stream<Item = Result<(String, Vec<u8>)>> + Send;
|
||||
type Upstream = dyn Stream<Item = Result<(String, Vec<u8>)>> + Send;
|
||||
|
||||
/// Merges multiple values that have the same prefix of the key
|
||||
/// from `upstream` into a single value.
|
||||
pub fn multiple_value_stream(
|
||||
mut upstream: Pin<Box<Upstream>>,
|
||||
) -> impl Stream<Item = Result<(KeySet, Vec<u8>)>> {
|
||||
@@ -117,14 +119,14 @@ pub fn multiple_value_stream(
|
||||
} else {
|
||||
// Starts to collect next key value pair.
|
||||
collecting = Some(CollectingState::new(key, value));
|
||||
yield multiple_values_collector(current)?;
|
||||
yield merge_multiple_values(current)?;
|
||||
}
|
||||
}
|
||||
None => collecting = Some(CollectingState::new(key, value)),
|
||||
}
|
||||
}
|
||||
if let Some(current) = collecting.take() {
|
||||
yield multiple_values_collector(current)?
|
||||
yield merge_multiple_values(current)?
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -149,7 +151,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_values_collector() {
|
||||
async fn test_merge_multiple_values() {
|
||||
let upstream = stream::iter(vec![
|
||||
Ok(("foo".to_string(), vec![0, 1, 2, 3])),
|
||||
Ok(("foo/0002".to_string(), vec![6, 7])),
|
||||
@@ -192,7 +194,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_values_collector_err() {
|
||||
async fn test_multiple_values_stream_err() {
|
||||
let upstream = stream::iter(vec![
|
||||
Err(error::UnexpectedSnafu { err_msg: "mock" }.build()),
|
||||
Ok(("foo".to_string(), vec![0, 1, 2, 3])),
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::Plugins;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_grpc::channel_manager;
|
||||
@@ -115,6 +116,9 @@ impl Default for MetaSrvOptions {
|
||||
procedure: ProcedureConfig {
|
||||
max_retry_times: 12,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
// The etcd the maximum size of any request is 1.5 MiB
|
||||
// 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
|
||||
max_metadata_value_size: Some(ReadableSize::kb(1500)),
|
||||
},
|
||||
failure_detector: PhiAccrualFailureDetectorOptions::default(),
|
||||
datanode: DatanodeOptions::default(),
|
||||
|
||||
@@ -376,8 +376,13 @@ fn build_procedure_manager(
|
||||
retry_delay: options.procedure.retry_delay,
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
|
||||
Arc::new(LocalManager::new(manager_config, state_store))
|
||||
let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size(
|
||||
options
|
||||
.procedure
|
||||
.max_metadata_value_size
|
||||
.map(|v| v.as_bytes() as usize),
|
||||
);
|
||||
Arc::new(LocalManager::new(manager_config, Arc::new(state_store)))
|
||||
}
|
||||
|
||||
fn build_ddl_manager(
|
||||
|
||||
@@ -174,6 +174,7 @@ impl GreptimeDbClusterBuilder {
|
||||
// We only make max_retry_times and retry_delay large than the default in tests.
|
||||
max_retry_times: 5,
|
||||
retry_delay: Duration::from_secs(1),
|
||||
max_metadata_value_size: None,
|
||||
},
|
||||
wal: self.metasrv_wal_config.clone(),
|
||||
..Default::default()
|
||||
|
||||
Reference in New Issue
Block a user