From 61461e6dee9307b04ba5061593773247985ebe15 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 21 Apr 2026 11:19:52 +0800 Subject: [PATCH] test: add repartition chaos target (#7924) * test: add repartition chaos fuzz target Signed-off-by: WenyXu * chore: use containerd runtime Signed-off-by: WenyXu * chore: enable logs Signed-off-by: WenyXu * chore: correct config Signed-off-by: WenyXu * fix: unit tests Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- .github/actions/setup-chaos/action.yml | 9 +- .../with-minio-repartition-gc.yaml | 5 + .github/workflows/develop.yml | 6 + tests-fuzz/Cargo.toml | 7 + tests-fuzz/src/context.rs | 2 + tests-fuzz/src/generator/repartition_expr.rs | 6 + tests-fuzz/src/ir/repartition_expr.rs | 8 + .../src/translator/mysql/repartition_expr.rs | 47 +- tests-fuzz/src/utils.rs | 1 + tests-fuzz/src/utils/cluster_info.rs | 39 ++ tests-fuzz/src/utils/crd.rs | 1 + tests-fuzz/src/utils/crd/network.rs | 291 ++++++++++++ tests-fuzz/src/utils/network_chaos.rs | 94 ++++ .../ddl/fuzz_repartition_table_chaos.rs | 447 ++++++++++++++++++ 14 files changed, 955 insertions(+), 8 deletions(-) create mode 100644 tests-fuzz/src/utils/crd/network.rs create mode 100644 tests-fuzz/src/utils/network_chaos.rs create mode 100644 tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs diff --git a/.github/actions/setup-chaos/action.yml b/.github/actions/setup-chaos/action.yml index 76eb48cf4a..4b3fb86744 100644 --- a/.github/actions/setup-chaos/action.yml +++ b/.github/actions/setup-chaos/action.yml @@ -1,15 +1,16 @@ -name: Setup Kind -description: Deploy Kind +name: Setup Chaos Mesh +description: Install and wait for Chaos Mesh runs: using: composite steps: - uses: actions/checkout@v4 - - name: Create kind cluster + - name: Install Chaos Mesh shell: bash run: | helm repo add chaos-mesh https://charts.chaos-mesh.org + helm repo update chaos-mesh kubectl create ns chaos-mesh - helm install chaos-mesh chaos-mesh/chaos-mesh -n=chaos-mesh --version 2.6.3 + helm install chaos-mesh chaos-mesh/chaos-mesh -n=chaos-mesh --set chaosDaemon.runtime=containerd --set chaosDaemon.socketPath=/run/containerd/containerd.sock --version 2.8.0 - name: Print Chaos-mesh if: always() shell: bash diff --git a/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml b/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml index b836606ad6..5b7f43d81b 100644 --- a/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml @@ -1,3 +1,8 @@ +logging: + level: "info" + format: "json" + filters: + - mito2::sst::file=debug meta: configData: |- [runtime] diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index b6ab0f8926..d0d2804c6a 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -506,6 +506,12 @@ jobs: minio: true kafka: false values: "with-minio.yaml" + - target: "fuzz_repartition_table_chaos" + mode: + name: "Local WAL repartition chaos" + minio: true + kafka: false + values: "with-minio-repartition-gc.yaml" steps: - name: Remove unused software run: | diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index bc687092c0..9fe051d8c1 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -100,6 +100,13 @@ test = false bench = false doc = false +[[bin]] +name = "fuzz_repartition_table_chaos" +path = "targets/ddl/fuzz_repartition_table_chaos.rs" +test = false +bench = false +doc = false + [[bin]] name = "fuzz_repartition_metric_table" path = "targets/ddl/fuzz_repartition_metric_table.rs" diff --git a/tests-fuzz/src/context.rs b/tests-fuzz/src/context.rs index 1180e8ac7f..2f65ad56aa 100644 --- a/tests-fuzz/src/context.rs +++ b/tests-fuzz/src/context.rs @@ -376,6 +376,7 @@ mod tests { table_name: expr.table_name.clone(), target: partitions.last().unwrap().clone(), into: vec![expected_exprs[2].clone(), expected_exprs[3].clone()], + wait: true, })) .unwrap(); let partition_def = table_ctx.partition.as_ref().unwrap(); @@ -417,6 +418,7 @@ mod tests { .repartition(RepartitionExpr::Merge(MergePartitionExpr { table_name: expr.table_name.clone(), targets: vec![partitions[1].clone(), partitions[2].clone()], + wait: true, })) .unwrap(); let partition_def = table_ctx.partition.as_ref().unwrap(); diff --git a/tests-fuzz/src/generator/repartition_expr.rs b/tests-fuzz/src/generator/repartition_expr.rs index bf2c98567c..a6e748a0c0 100644 --- a/tests-fuzz/src/generator/repartition_expr.rs +++ b/tests-fuzz/src/generator/repartition_expr.rs @@ -25,6 +25,8 @@ use crate::ir::repartition_expr::{MergePartitionExpr, SplitPartitionExpr}; #[builder(pattern = "owned")] pub struct SplitPartitionExprGenerator { table_ctx: TableContextRef, + #[builder(default = "true")] + wait: bool, } impl Generator for SplitPartitionExprGenerator { @@ -66,6 +68,7 @@ impl Generator for SplitPartitionExprGe table_name, target: from_expr, into: vec![left, right], + wait: self.wait, }) } } @@ -74,6 +77,8 @@ impl Generator for SplitPartitionExprGe #[builder(pattern = "owned")] pub struct MergePartitionExprGenerator { table_ctx: TableContextRef, + #[builder(default = "true")] + wait: bool, } impl Generator for MergePartitionExprGenerator { @@ -96,6 +101,7 @@ impl Generator for MergePartitionExprGe Ok(MergePartitionExpr { table_name, targets: vec![left, right], + wait: self.wait, }) } } diff --git a/tests-fuzz/src/ir/repartition_expr.rs b/tests-fuzz/src/ir/repartition_expr.rs index f048308bce..5c8b401c8d 100644 --- a/tests-fuzz/src/ir/repartition_expr.rs +++ b/tests-fuzz/src/ir/repartition_expr.rs @@ -22,12 +22,16 @@ pub struct SplitPartitionExpr { pub table_name: Ident, pub target: PartitionExpr, pub into: Vec, + #[serde(default = "default_wait")] + pub wait: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MergePartitionExpr { pub table_name: Ident, pub targets: Vec, + #[serde(default = "default_wait")] + pub wait: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -35,3 +39,7 @@ pub enum RepartitionExpr { Split(SplitPartitionExpr), Merge(MergePartitionExpr), } + +const fn default_wait() -> bool { + true +} diff --git a/tests-fuzz/src/translator/mysql/repartition_expr.rs b/tests-fuzz/src/translator/mysql/repartition_expr.rs index b620d38f8f..56c27594b3 100644 --- a/tests-fuzz/src/translator/mysql/repartition_expr.rs +++ b/tests-fuzz/src/translator/mysql/repartition_expr.rs @@ -29,6 +29,7 @@ impl DslTranslator for RepartitionExprTranslator { table_name, target, into, + wait, }) => { let target_expr = format_partition_expr_sql(target); let into_exprs = into @@ -36,23 +37,26 @@ impl DslTranslator for RepartitionExprTranslator { .map(format_partition_expr_sql) .collect::>() .join(",\n "); + let wait_clause = format_wait_clause(*wait); Ok(format!( - "ALTER TABLE {} SPLIT PARTITION (\n {}\n) INTO (\n {}\n);", - table_name, target_expr, into_exprs + "ALTER TABLE {} SPLIT PARTITION (\n {}\n) INTO (\n {}\n){};", + table_name, target_expr, into_exprs, wait_clause )) } RepartitionExpr::Merge(MergePartitionExpr { table_name, targets, + wait, }) => { let merge_exprs = targets .iter() .map(format_partition_expr_sql) .collect::>() .join(",\n "); + let wait_clause = format_wait_clause(*wait); Ok(format!( - "ALTER TABLE {} MERGE PARTITION (\n {}\n);", - table_name, merge_exprs + "ALTER TABLE {} MERGE PARTITION (\n {}\n){};", + table_name, merge_exprs, wait_clause )) } } @@ -63,6 +67,14 @@ fn format_partition_expr_sql(expr: &PartitionExpr) -> String { expr.to_parser_expr().to_string() } +fn format_wait_clause(wait: bool) -> String { + if wait { + String::new() + } else { + " WITH (\n WAIT = false\n)".to_string() + } +} + #[cfg(test)] mod tests { use datatypes::value::Value; @@ -83,6 +95,7 @@ mod tests { .gt_eq(Value::Int32(5)) .and(col("id").lt(Value::Int32(10))), ], + wait: true, }); let sql = RepartitionExprTranslator.translate(&expr).unwrap(); let expected = r#"ALTER TABLE demo SPLIT PARTITION ( @@ -102,11 +115,37 @@ mod tests { col("id").gt_eq(Value::Int32(10)), col("id").gt_eq(Value::Int32(20)), ], + wait: true, }); let sql = RepartitionExprTranslator.translate(&expr).unwrap(); let expected = r#"ALTER TABLE demo MERGE PARTITION ( id >= 10, id >= 20 +);"#; + assert_eq!(sql, expected); + } + + #[test] + fn test_translate_split_expr_wait_false() { + let expr = RepartitionExpr::Split(SplitPartitionExpr { + table_name: "demo".into(), + target: col("id").lt(Value::Int32(10)), + into: vec![ + col("id").lt(Value::Int32(5)), + col("id") + .gt_eq(Value::Int32(5)) + .and(col("id").lt(Value::Int32(10))), + ], + wait: false, + }); + let sql = RepartitionExprTranslator.translate(&expr).unwrap(); + let expected = r#"ALTER TABLE demo SPLIT PARTITION ( + id < 10 +) INTO ( + id < 5, + id >= 5 AND id < 10 +) WITH ( + WAIT = false );"#; assert_eq!(sql, expected); } diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index d55abab3c2..055663af24 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -19,6 +19,7 @@ pub mod crd; pub mod csv_dump_writer; pub mod health; pub mod migration; +pub mod network_chaos; pub mod partition; pub mod pod_failure; pub mod procedure; diff --git a/tests-fuzz/src/utils/cluster_info.rs b/tests-fuzz/src/utils/cluster_info.rs index 9f43711ca4..035f7908ac 100644 --- a/tests-fuzz/src/utils/cluster_info.rs +++ b/tests-fuzz/src/utils/cluster_info.rs @@ -77,3 +77,42 @@ pub async fn wait_for_all_datanode_online(greptime: MySqlPool, timeout: Duration ) .await } + +pub async fn wait_for_all_datanode_offline(greptime: MySqlPool, timeout: Duration) { + wait_condition_fn( + timeout, + || { + let greptime = greptime.clone(); + Box::pin(async move { + let nodes = fetch_nodes(&greptime) + .await + .unwrap() + .into_iter() + .flat_map(|node| { + if node.peer_type == PEER_TYPE_DATANODE { + Some(node) + } else { + None + } + }) + .collect::>(); + info!("Waits for datanode offline: {nodes:?}"); + nodes + }) + }, + |nodes| { + nodes + .into_iter() + .map(|node| { + info!( + "Waits for datanode {} offline, active_time: {:?}", + node.peer_id, node.active_time + ); + parse_duration(&node.active_time.unwrap()).unwrap() + }) + .all(|duration| duration >= Duration::from_secs(3)) + }, + Duration::from_secs(2), + ) + .await +} diff --git a/tests-fuzz/src/utils/crd.rs b/tests-fuzz/src/utils/crd.rs index c2fdcad9e0..df3a2cb86e 100644 --- a/tests-fuzz/src/utils/crd.rs +++ b/tests-fuzz/src/utils/crd.rs @@ -13,4 +13,5 @@ // limitations under the License. pub mod common; +pub mod network; pub mod pod; diff --git a/tests-fuzz/src/utils/crd/network.rs b/tests-fuzz/src/utils/crd/network.rs new file mode 100644 index 0000000000..cbae991a49 --- /dev/null +++ b/tests-fuzz/src/utils/crd/network.rs @@ -0,0 +1,291 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use derive_builder::Builder; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use super::common::{Mode, Selector}; + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Target { + mode: Mode, + selector: Selector, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub enum Action { + Netem, + Delay, + Loss, + Corrupt, + Partition, + Bandwidth, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub enum Direction { + From, + To, + Both, +} + +#[derive( + CustomResource, Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, JsonSchema, +)] +#[kube( + group = "chaos-mesh.org", + version = "v1alpha1", + namespaced, + kind = "NetworkChaos", + plural = "networkchaos", + singular = "networkchaos", + derive = "PartialEq" +)] +#[serde(rename_all = "camelCase")] +pub struct NetworkChaosSpec { + // Indicates the specific fault type. Available types include: + // netem, delay (network delay), loss (packet loss), duplicate (packet duplicating), + // corrupt (packet corrupt), partition (network partition), and bandwidth (network bandwidth limit). + // After you specify action field, + // refer to Description for action-related fields for other necessary field configuration. + action: Action, + // Used in combination with direction, making Chaos only effective for some packets. + #[builder(setter(into), default = "None")] + #[serde(skip_serializing_if = "Option::is_none")] + target: Option, + // Indicates the direction of target packets. Available value include `from` (the packets from target), + // `to` (the packets to target), and `both` ( the packets from or to target). + // This parameter makes Chaos only take effect for a specific direction of packets. + #[builder(setter(into), default = "None")] + #[serde(skip_serializing_if = "Option::is_none")] + direction: Option, + // Specifies the mode of the experiment. The mode options include one (selecting a random Pod), + // all (selecting all eligible Pods), fixed (selecting a specified number of eligible Pods), + // fixed-percent (selecting a specified percentage of Pods from the eligible Pods), + // and random-max-percent (selecting the maximum percentage of Pods from the eligible Pods). + mode: Mode, + // Provides a parameter for the mode configuration, depending on mode. + // For example, when mode is set to fixed-percent, value specifies the percentage of Pods. + #[builder(setter(into), default = "None")] + #[serde(skip_serializing_if = "Option::is_none")] + value: Option, + // Specifies the target Pod. + selector: Selector, + // Indicates the network targets except for Kubernetes, which can be IPv4 addresses or domains. + // This parameter only works with `direction: to`. + #[builder(setter(into), default = "Vec::new()")] + #[serde(skip_serializing_if = "Vec::is_empty", default)] + external_targets: Vec, + // Specifies the affected network interface + #[builder(setter(into), default = "None")] + #[serde(skip_serializing_if = "Option::is_none")] + device: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + delay: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + loss: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + duplicate: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + corrupt: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + bandwidth: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into), default = "None")] + duration: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, JsonSchema)] +#[serde(default)] +#[builder(default)] +pub struct Delay { + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + // Indicates the network latency. + latency: Option, + // Indicates the correlation between the current latency and the previous one. + // Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + correlation: Option, + // Indicates the range of the network latency. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + jitter: Option, + // Indicates the status of network packet reordering + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + reorder: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema, Builder)] +#[serde(default)] +#[builder(default)] +pub struct Reorder { + // Indicates the probability to reorder + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + reorder: Option, + // Indicates the correlation between this time's length of delay time + // and the previous time's length of delay time. Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + correlation: Option, + // Indicates the gap before and after packet reordering. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + gap: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema, Builder)] +#[serde(default)] +#[builder(default, setter(into, strip_option))] +pub struct Loss { + // Indicates the probability of packet loss. + // Range of value: [0, 100]. + #[serde(skip_serializing_if = "Option::is_none")] + loss: Option, + // Indicates the correlation between the probability of current packet loss + // and the previous time's packet loss. Range of value: [0, 100]. + #[serde(skip_serializing_if = "Option::is_none")] + correlation: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema, Builder)] +#[serde(default)] +#[builder(default, setter(into, strip_option))] +pub struct Duplicate { + // Indicates the probability of packet duplicating. + // Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + duplicate: Option, + // Indicates the correlation between the probability of current packet duplicating + // and the previous time's packet duplicating. Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + correlation: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema, Builder)] +#[serde(default)] +#[builder(default, setter(into, strip_option))] +pub struct Corrupt { + // Indicates the probability of packet corruption. Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + corrupt: Option, + // Indicates the correlation between the probability of current packet corruption + // and the previous time's packet corruption. Range of value: [0, 100] + #[serde(skip_serializing_if = "Option::is_none")] + correlation: Option, +} + +#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize, JsonSchema, Builder)] +#[serde(default)] +#[builder(default)] +pub struct Bandwidth { + // Indicates the rate of bandwidth limit. e.g., 1mbps. + rate: String, + // Indicates the number of bytes waiting in queue. + limit: u32, + // Indicates the maximum number of bytes that can be sent instantaneously. + buffer: u32, + // Indicates the maximum consumption of `bucket` (usually not set) + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + peakrate: Option, + // Indicates the size of peakrate bucket (usually not set). + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(into, strip_option))] + minburst: Option, +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::{Action, Direction, NetworkChaosSpecBuilder, TargetBuilder}; + use crate::utils::crd::common::{Mode, SelectorBuilder}; + use crate::utils::crd::network::{DelayBuilder, NetworkChaos}; + + #[test] + fn test_serde() { + let mut fs = BTreeMap::new(); + fs.insert("app".into(), "datanode".into()); + let selector = SelectorBuilder::default() + .namespaces(vec!["default".into()]) + .label_selectors(fs) + .build() + .unwrap(); + let target = TargetBuilder::default() + .mode(Mode::All) + .selector(selector.clone()) + .build() + .unwrap(); + let delay = DelayBuilder::default() + .latency("100ms") + .correlation("100") + .jitter("0ms") + .build() + .unwrap(); + let spec = NetworkChaosSpecBuilder::default() + .action(Action::Delay) + .target(target) + .direction(Direction::Both) + .mode(Mode::One) + .selector(selector) + .delay(delay) + .duration(Some("10s".to_string())) + .build() + .unwrap(); + let crd = NetworkChaos::new("my-delay", spec); + let ser = serde_yaml::to_string(&crd).unwrap(); + + let expected = r#"apiVersion: chaos-mesh.org/v1alpha1 +kind: NetworkChaos +metadata: + name: my-delay +spec: + action: delay + target: + mode: all + selector: + namespaces: + - default + labelSelectors: + app: datanode + direction: both + mode: one + selector: + namespaces: + - default + labelSelectors: + app: datanode + delay: + latency: 100ms + correlation: '100' + jitter: 0ms + duration: 10s +"#; + assert_eq!(expected, ser); + } +} diff --git a/tests-fuzz/src/utils/network_chaos.rs b/tests-fuzz/src/utils/network_chaos.rs new file mode 100644 index 0000000000..26d4becfe8 --- /dev/null +++ b/tests-fuzz/src/utils/network_chaos.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use kube::Api; + +use crate::error::Result; +use crate::utils::crd::common::{Mode, Selector, SelectorBuilder}; +use crate::utils::crd::network::{ + Action, Direction, NetworkChaos, NetworkChaosSpecBuilder, TargetBuilder, +}; + +fn build_datanode_selector(namespace: &str, cluster_name: &str) -> Selector { + let mut selector = BTreeMap::new(); + selector.insert( + "app.greptime.io/component".into(), + format!("{cluster_name}-datanode"), + ); + + SelectorBuilder::default() + .namespaces(vec![namespace.to_string()]) + .label_selectors(selector) + .build() + .unwrap() +} + +fn build_metasrv_selector(namespace: &str, cluster_name: &str) -> Selector { + let mut selector = BTreeMap::new(); + selector.insert( + "app.greptime.io/component".into(), + format!("{cluster_name}-meta"), + ); + + SelectorBuilder::default() + .namespaces(vec![namespace.to_string()]) + .label_selectors(selector) + .build() + .unwrap() +} + +/// Injects a network partition between a datanode pod and metasrv. +pub async fn inject_datanode_metasrv_network_partition( + client: kube::Client, + namespace: &str, + cluster_name: &str, + duration_secs: usize, +) -> Result { + let selector = build_datanode_selector(namespace, cluster_name); + let target = TargetBuilder::default() + .mode(Mode::All) + .selector(build_metasrv_selector(namespace, cluster_name)) + .build() + .unwrap(); + + let spec = NetworkChaosSpecBuilder::default() + .action(Action::Partition) + .direction(Direction::Both) + .mode(Mode::All) + .selector(selector) + .target(target) + .duration(format!("{duration_secs}s")) + .build() + .unwrap(); + + let chaos_name = "datanode-metasrv-network-partition".to_string(); + let cr = NetworkChaos::new(&chaos_name, spec); + let api: Api = Api::namespaced(client, namespace); + api.create(&Default::default(), &cr).await.unwrap(); + + Ok(chaos_name) +} + +/// Recovers network chaos by deleting the associated NetworkChaos resource. +pub async fn recover_network_chaos( + client: kube::Client, + namespace: &str, + name: &str, +) -> Result<()> { + let api: Api = Api::namespaced(client, namespace); + api.delete(name, &Default::default()).await.unwrap(); + Ok(()) +} diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs new file mode 100644 index 0000000000..d3789b696c --- /dev/null +++ b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs @@ -0,0 +1,447 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::env; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::info; +use common_time::Timestamp; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::ResultExt; +use sqlx::{Executor, MySql, Pool}; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map, + uppercase_and_keyword_backtick_map, +}; +use tests_fuzz::generator::Generator; +use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::repartition_expr::{ + MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, +}; +use tests_fuzz::ir::{ + CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue, + SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock, +}; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator; +use tests_fuzz::utils::cluster_info::{ + wait_for_all_datanode_offline, wait_for_all_datanode_online, +}; +use tests_fuzz::utils::network_chaos::{ + inject_datanode_metasrv_network_partition, recover_network_chaos, +}; +use tests_fuzz::utils::procedure::procedure_state as fetch_procedure_state_json; +use tests_fuzz::utils::{ + Connections, GT_FUZZ_CLUSTER_NAME, GT_FUZZ_CLUSTER_NAMESPACE, get_fuzz_override, + get_gt_fuzz_input_max_rows, init_greptime_connections_via_env, +}; +use tests_fuzz::validator; +use tests_fuzz::validator::row::count_values; + +#[derive(Clone)] +struct FuzzContext { + greptime: Pool, + kube: kube::client::Client, + namespace: String, + cluster_name: String, +} + +const PROCEDURE_TIMEOUT: Duration = Duration::from_secs(300); +const NETWORK_CHAOS_DURATION_SECS: usize = 360; + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Clone, Debug)] +struct FuzzInput { + seed: u64, + rows: usize, + partitions: usize, + chaos_delay_ms: u64, + chaos_hold_secs: u64, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); + let mut rng = ChaChaRng::seed_from_u64(seed); + let rows = get_fuzz_override::("ROWS") + .unwrap_or_else(|| rng.random_range(2..get_gt_fuzz_input_max_rows())); + let partitions = + get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let chaos_delay_ms = + get_fuzz_override::("CHAOS_DELAY_MS").unwrap_or_else(|| rng.random_range(0..5000)); + let chaos_hold_secs = + get_fuzz_override::("CHAOS_HOLD_SECS").unwrap_or_else(|| rng.random_range(10..20)); + + Ok(FuzzInput { + seed, + rows, + partitions, + chaos_delay_ms, + chaos_hold_secs, + }) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ProcedureTerminalState { + Done, + Failed, +} + +fn generate_create_expr( + input: &FuzzInput, + rng: &mut R, +) -> Result { + let create_table_generator = CreateTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .columns(5) + .partition(input.partitions) + .engine("mito") + .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) + .build() + .unwrap(); + create_table_generator.generate(rng) +} + +fn build_insert_expr( + table_ctx: &TableContextRef, + rng: &mut R, + partitions: &SimplePartitions, + clock: &Arc>, + rows: usize, +) -> InsertIntoExpr { + let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone()); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len(); + let moved_partitions = partitions.clone(); + let insert_generator = InsertExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .omit_column_list(false) + .rows(rows.max(partition_len)) + .required_columns(vec![partitions.column_name.clone()]) + .value_overrides(Some(Box::new(move |column, rng| { + if column.name.value == moved_partitions.column_name.value { + let bound_idx = counter_clone.fetch_add(1, Ordering::Relaxed) % partition_len; + return Some(RowValue::Value(generate_partition_value( + rng, + &column.column_type, + &moved_partitions.bounds, + bound_idx, + ))); + } + None + }))) + .ts_value_generator(ts_value_generator) + .build() + .unwrap(); + insert_generator.generate(rng).unwrap() +} + +async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> { + let mut delay = Duration::from_millis(100); + let mut attempt = 0; + let max_attempts = 10; + loop { + match ctx.greptime.execute(sql).await { + Ok(_) => return Ok(()), + Err(err) => { + tokio::time::sleep(delay).await; + delay = std::cmp::min(delay * 2, Duration::from_secs(1)); + attempt += 1; + if attempt >= max_attempts { + return Err(err).context(error::ExecuteQuerySnafu { sql }); + } + } + } + } +} + +async fn create_table(ctx: &FuzzContext, expr: &CreateTableExpr) -> Result { + let translator = CreateTableExprTranslator; + let sql = translator.translate(expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Create table: {sql}, result: {result:?}"); + Ok(Arc::new(TableContext::from(expr))) +} + +async fn insert_initial_rows( + ctx: &FuzzContext, + table_ctx: &TableContextRef, + rng: &mut R, + rows: usize, +) -> Result { + let partitions = SimplePartitions::from_table_ctx(table_ctx).unwrap(); + let clock = Arc::new(Mutex::new(Timestamp::current_millis())); + let insert_expr = build_insert_expr(table_ctx, rng, &partitions, &clock, rows); + let inserted_rows = insert_expr.values_list.len() as u64; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + execute_insert_with_retry(ctx, &sql).await?; + info!("Insert initial rows: {sql}"); + Ok(inserted_rows) +} + +async fn validate_table_rows( + ctx: &FuzzContext, + table_ctx: &TableContextRef, + inserted_rows: u64, +) -> Result<()> { + let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name); + let count = count_values(&ctx.greptime, &count_sql).await?; + assert_eq!(count.count as u64, inserted_rows); + + let timestamp_column_name = table_ctx.timestamp_column().unwrap().name.clone(); + let distinct_count_sql = format!( + "SELECT COUNT(DISTINCT {}) AS count FROM {}", + timestamp_column_name, table_ctx.name + ); + let distinct_count = count_values(&ctx.greptime, &distinct_count_sql).await?; + assert_eq!(distinct_count.count as u64, inserted_rows); + Ok(()) +} + +fn repartition_operation( + table_ctx: &TableContextRef, + rng: &mut R, + wait: bool, +) -> Result { + let split = rng.random_bool(0.5); + if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split { + let expr = SplitPartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .wait(wait) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Split(expr)) + } else { + let expr = MergePartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .wait(wait) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Merge(expr)) + } +} + +async fn submit_repartition_procedure(ctx: &FuzzContext, expr: &RepartitionExpr) -> Result { + let translator = RepartitionExprTranslator; + let sql = translator.translate(expr)?; + let (procedure_id,) = sqlx::query_as::<_, (String,)>(&sql) + .fetch_one(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Submit repartition procedure: {sql}, procedure_id: {procedure_id}"); + Ok(procedure_id) +} + +async fn fetch_procedure_state( + ctx: &FuzzContext, + procedure_id: &str, +) -> Result> { + let output = fetch_procedure_state_json(&ctx.greptime, procedure_id).await; + + Ok(if output.contains("Done") { + Some(ProcedureTerminalState::Done) + } else if output.contains("Failed") { + Some(ProcedureTerminalState::Failed) + } else { + None + }) +} + +async fn wait_for_procedure_terminal_state( + ctx: &FuzzContext, + procedure_id: &str, + timeout: Duration, +) -> Result { + let deadline = tokio::time::Instant::now() + timeout; + loop { + if let Some(state) = fetch_procedure_state(ctx, procedure_id).await? { + info!("Procedure terminal state: {state:?}, procedure_id: {procedure_id}"); + return Ok(state); + } + + if tokio::time::Instant::now() >= deadline { + return error::AssertSnafu { + reason: format!( + "procedure {procedure_id} did not reach terminal state before timeout" + ), + } + .fail(); + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn validate_terminal_metadata( + ctx: &FuzzContext, + table_name: &tests_fuzz::ir::Ident, + terminal: ProcedureTerminalState, + before_table_ctx: &TableContextRef, + after_table_ctx: &TableContextRef, +) -> Result<()> { + let partition_entries = validator::partition::fetch_partitions_info_schema( + &ctx.greptime, + "public".into(), + table_name, + ) + .await?; + + info!( + "Validate terminal metadata: procedure_terminal_state={terminal:?}, partition_entries={partition_entries:?}" + ); + + match terminal { + ProcedureTerminalState::Done => validator::partition::assert_partitions( + after_table_ctx.partition.as_ref().unwrap(), + &partition_entries, + )?, + ProcedureTerminalState::Failed => validator::partition::assert_partitions( + before_table_ctx.partition.as_ref().unwrap(), + &partition_entries, + )?, + } + + Ok(()) +} + +async fn drop_table(ctx: &FuzzContext, table_name: &tests_fuzz::ir::Ident) -> Result<()> { + let sql = format!("DROP TABLE {}", table_name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Drop table: {table_name}, result: {result:?}"); + Ok(()) +} + +async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + + let create_expr = generate_create_expr(&input, &mut rng)?; + let before_table_ctx = create_table(&ctx, &create_expr).await?; + let inserted_rows = insert_initial_rows(&ctx, &before_table_ctx, &mut rng, input.rows).await?; + validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?; + + let before_entries = validator::partition::fetch_partitions_info_schema( + &ctx.greptime, + "public".into(), + &before_table_ctx.name, + ) + .await?; + info!("Before repartition partition entries: {before_entries:?}"); + + let repartition_expr = repartition_operation(&before_table_ctx, &mut rng, false)?; + let after_table_ctx = Arc::new( + Arc::unwrap_or_clone(before_table_ctx.clone()) + .repartition(repartition_expr.clone()) + .unwrap(), + ); + + let namespace = ctx.namespace.clone(); + let cluster_name = ctx.cluster_name.clone(); + let kube_client = ctx.kube.clone(); + let handle = tokio::spawn(async move { + let chaos_name = inject_datanode_metasrv_network_partition( + kube_client.clone(), + &namespace, + &cluster_name, + NETWORK_CHAOS_DURATION_SECS, + ) + .await + .unwrap(); + info!( + "Injected network chaos: {chaos_name}, holding for {} seconds", + input.chaos_hold_secs + ); + + tokio::time::sleep(Duration::from_secs(input.chaos_hold_secs)).await; + recover_network_chaos(kube_client.clone(), &namespace, &chaos_name) + .await + .unwrap(); + }); + + wait_for_all_datanode_offline(ctx.greptime.clone(), Duration::from_secs(30)).await; + info!( + "Waiting for {} ms before submitting procedure", + input.chaos_delay_ms + ); + tokio::time::sleep(Duration::from_millis(input.chaos_delay_ms)).await; + + let procedure_id = submit_repartition_procedure(&ctx, &repartition_expr).await?; + let terminal = + wait_for_procedure_terminal_state(&ctx, &procedure_id, PROCEDURE_TIMEOUT).await?; + handle.await.unwrap(); + + wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(30)).await; + validate_terminal_metadata( + &ctx, + &before_table_ctx.name, + terminal, + &before_table_ctx, + &after_table_ctx, + ) + .await?; + validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?; + + drop_table(&ctx, &before_table_ctx.name).await?; + ctx.close().await; + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + kube: kube::client::Client::try_default() + .await + .expect("init kube client"), + namespace: env::var(GT_FUZZ_CLUSTER_NAMESPACE).unwrap_or("my-greptimedb".to_string()), + cluster_name: env::var(GT_FUZZ_CLUSTER_NAME).unwrap_or("my-greptimedb".to_string()), + }; + execute_repartition_chaos(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});