diff --git a/.github/actions/setup-chaos/action.yml b/.github/actions/setup-chaos/action.yml new file mode 100644 index 0000000000..76eb48cf4a --- /dev/null +++ b/.github/actions/setup-chaos/action.yml @@ -0,0 +1,17 @@ +name: Setup Kind +description: Deploy Kind +runs: + using: composite + steps: + - uses: actions/checkout@v4 + - name: Create kind cluster + shell: bash + run: | + helm repo add chaos-mesh https://charts.chaos-mesh.org + kubectl create ns chaos-mesh + helm install chaos-mesh chaos-mesh/chaos-mesh -n=chaos-mesh --version 2.6.3 + - name: Print Chaos-mesh + if: always() + shell: bash + run: | + kubectl get po -n chaos-mesh diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 8fc5acf782..e25faf7b47 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -24,7 +24,8 @@ inputs: description: "Etcd endpoints" values-filename: default: "with-minio.yaml" - + enable-region-failover: + default: false runs: using: composite @@ -47,6 +48,7 @@ runs: helm upgrade \ --install my-greptimedb \ --set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \ + --set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \ --set image.registry=${{ inputs.image-registry }} \ --set image.repository=${{ inputs.image-repository }} \ --set image.tag=${{ inputs.image-tag }} \ diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 6e1f06d807..7d109bed1b 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -409,6 +409,132 @@ jobs: docker rm $(docker ps -a -q) docker system prune -f + distributed-fuzztest-with-chaos: + name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }}) + runs-on: ubuntu-latest + needs: build-greptime-ci + strategy: + matrix: + target: ["fuzz_failover_mito_regions"] + mode: + - name: "Remote WAL" + minio: true + kafka: true + values: "with-remote-wal.yaml" + steps: + - uses: actions/checkout@v4 + - name: Setup Kind + uses: ./.github/actions/setup-kind + - name: Setup Chaos Mesh + uses: ./.github/actions/setup-chaos + - if: matrix.mode.minio + name: Setup Minio + uses: ./.github/actions/setup-minio + - if: matrix.mode.kafka + name: Setup Kafka cluser + uses: ./.github/actions/setup-kafka-cluster + - name: Setup Etcd cluser + uses: ./.github/actions/setup-etcd-cluster + # Prepares for fuzz tests + - uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.RUST_TOOLCHAIN }} + - name: Rust Cache + uses: Swatinem/rust-cache@v2 + with: + # Shares across multiple jobs + shared-key: "fuzz-test-targets" + - name: Set Rust Fuzz + shell: bash + run: | + sudo apt-get install -y libfuzzer-14-dev + rustup install nightly + cargo +nightly install cargo-fuzz cargo-gc-bin + # Downloads ci image + - name: Download pre-built binariy + uses: actions/download-artifact@v4 + with: + name: bin + path: . + - name: Unzip binary + run: | + tar -xvf ./bin.tar.gz + rm ./bin.tar.gz + - name: Build and push GreptimeDB image + uses: ./.github/actions/build-and-push-ci-image + - name: Wait for etcd + run: | + kubectl wait \ + --for=condition=Ready \ + pod -l app.kubernetes.io/instance=etcd \ + --timeout=120s \ + -n etcd-cluster + - if: matrix.mode.minio + name: Wait for minio + run: | + kubectl wait \ + --for=condition=Ready \ + pod -l app=minio \ + --timeout=120s \ + -n minio + - if: matrix.mode.kafka + name: Wait for kafka + run: | + kubectl wait \ + --for=condition=Ready \ + pod -l app.kubernetes.io/instance=kafka \ + --timeout=120s \ + -n kafka-cluster + - name: Print etcd info + shell: bash + run: kubectl get all --show-labels -n etcd-cluster + # Setup cluster for test + - name: Setup GreptimeDB cluster + uses: ./.github/actions/setup-greptimedb-cluster + with: + image-registry: localhost:5001 + values-filename: ${{ matrix.mode.values }} + enable-region-failover: true + - name: Port forward (mysql) + run: | + kubectl port-forward service/my-greptimedb-frontend 4002:4002 -n my-greptimedb& + - name: Fuzz Test + uses: ./.github/actions/fuzz-test + env: + CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a + GT_MYSQL_ADDR: 127.0.0.1:4002 + with: + target: ${{ matrix.target }} + max-total-time: 120 + - name: Describe Nodes + if: failure() + shell: bash + run: | + kubectl describe nodes + - name: Export kind logs + if: failure() + shell: bash + run: | + kind export logs /tmp/kind + - name: Upload logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: fuzz-tests-kind-logs-${{ matrix.mode.name }}-${{ matrix.target }} + path: /tmp/kind + retention-days: 3 + - name: Delete cluster + if: success() + shell: bash + run: | + kind delete cluster + docker stop $(docker ps -a -q) + docker rm $(docker ps -a -q) + docker system prune -f + sqlness: name: Sqlness Test (${{ matrix.mode.name }}) needs: build diff --git a/Cargo.lock b/Cargo.lock index d6fca8241d..68663ae65c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11428,6 +11428,8 @@ dependencies = [ "datatypes", "derive_builder 0.12.0", "dotenv", + "futures", + "humantime", "k8s-openapi", "kube", "lazy_static", @@ -11445,6 +11447,7 @@ dependencies = [ "sql", "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", "sqlx", + "store-api", "strum 0.25.0", "tinytemplate", "tokio", diff --git a/Makefile b/Makefile index a9b7444d87..4c36f5c86b 100644 --- a/Makefile +++ b/Makefile @@ -170,6 +170,10 @@ FUZZ_TARGET ?= fuzz_alter_table fuzz: cargo fuzz run ${FUZZ_TARGET} --fuzz-dir tests-fuzz -D -s none -- -runs=${RUNS} +.PHONY: fuzz-ls +fuzz-ls: + cargo fuzz list --fuzz-dir tests-fuzz + .PHONY: check check: ## Cargo check all the targets. cargo check --workspace --all-targets --all-features diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index 3bf4f677f9..0dd1b05d7d 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -27,6 +27,8 @@ common-time = { workspace = true } datatypes = { workspace = true } derive_builder = { workspace = true } dotenv = "0.15" +futures = { workspace = true } +humantime = { workspace = true } k8s-openapi = { version = "0.22", features = ["v1_30"] } kube = { version = "0.92", features = [ "runtime", @@ -54,6 +56,7 @@ sqlx = { version = "0.6", features = [ "postgres", "chrono", ] } +store-api = { workspace = true } strum.workspace = true tinytemplate = "1.2" tokio = { workspace = true } @@ -117,3 +120,10 @@ test = false bench = false doc = false required-features = ["unstable"] + +[[bin]] +name = "fuzz_failover_mito_regions" +path = "targets/failover/fuzz_failover_mito_regions.rs" +test = false +bench = false +doc = false diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index 57f8966844..b274f0b997 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -44,7 +44,7 @@ pub struct CreateTableExprGenerator { partition: usize, if_not_exists: bool, #[builder(setter(into))] - name: String, + name: Ident, #[builder(setter(into))] with_clause: HashMap, name_generator: Box>, @@ -65,7 +65,7 @@ impl Default for CreateTableExprGenerator { engine: DEFAULT_ENGINE.to_string(), if_not_exists: false, partition: 0, - name: String::new(), + name: Ident::new(""), with_clause: HashMap::default(), name_generator: Box::new(MappedGenerator::new(WordGenerator, random_capitalize_map)), ts_column_type_generator: Box::new(TsColumnTypeGenerator), @@ -190,7 +190,7 @@ impl Generator for CreateTableExprGenerato if self.name.is_empty() { builder.table_name(self.name_generator.gen(rng)); } else { - builder.table_name(self.name.to_string()); + builder.table_name(self.name.clone()); } if !self.with_clause.is_empty() { let mut options = HashMap::new(); diff --git a/tests-fuzz/src/generator/insert_expr.rs b/tests-fuzz/src/generator/insert_expr.rs index 286a11d753..a0d5bfefc8 100644 --- a/tests-fuzz/src/generator/insert_expr.rs +++ b/tests-fuzz/src/generator/insert_expr.rs @@ -103,7 +103,7 @@ impl Generator for InsertExprGenerator { } Ok(InsertIntoExpr { - table_name: self.table_ctx.name.to_string(), + table_name: self.table_ctx.name.clone(), omit_column_list: self.omit_column_list, columns: values_columns, values_list, diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index 190217858f..4777eb8166 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -39,7 +39,7 @@ use serde::{Deserialize, Serialize}; use self::insert_expr::{RowValue, RowValues}; use crate::context::TableContextRef; -use crate::generator::Random; +use crate::generator::{Random, TsValueGenerator}; use crate::impl_random; use crate::ir::create_expr::ColumnOption; @@ -127,12 +127,10 @@ pub fn generate_random_value( } /// Generate monotonically increasing timestamps for MySQL. -pub fn generate_unique_timestamp_for_mysql( - base: i64, -) -> impl Fn(&mut R, TimestampType) -> Value { +pub fn generate_unique_timestamp_for_mysql(base: i64) -> TsValueGenerator { let base = Arc::new(AtomicI64::new(base)); - move |_rng, ts_type| -> Value { + Box::new(move |_rng, ts_type| -> Value { let value = base.fetch_add(1, Ordering::Relaxed); let v = match ts_type { TimestampType::Second(_) => Timestamp::new_second(1 + value), @@ -141,7 +139,7 @@ pub fn generate_unique_timestamp_for_mysql( TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value), }; Value::from(v) - } + }) } /// Generate random timestamps. @@ -253,6 +251,10 @@ impl Ident { quote_style: Some(quote), } } + + pub fn is_empty(&self) -> bool { + self.value.is_empty() + } } impl From<&str> for Ident { diff --git a/tests-fuzz/src/ir/insert_expr.rs b/tests-fuzz/src/ir/insert_expr.rs index 639f125cbc..1ecdb7eb58 100644 --- a/tests-fuzz/src/ir/insert_expr.rs +++ b/tests-fuzz/src/ir/insert_expr.rs @@ -16,10 +16,10 @@ use std::fmt::{Debug, Display}; use datatypes::value::Value; -use crate::ir::Column; +use crate::ir::{Column, Ident}; pub struct InsertIntoExpr { - pub table_name: String, + pub table_name: Ident, pub omit_column_list: bool, pub columns: Vec, pub values_list: Vec, diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index fd984e83a5..8849b45c4d 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -12,18 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod cluster_info; pub mod config; pub mod crd; pub mod health; +pub mod partition; +pub mod pod_failure; #[cfg(feature = "unstable")] pub mod process; +pub mod wait; use std::env; use common_telemetry::info; +use common_telemetry::tracing::log::LevelFilter; use snafu::ResultExt; -use sqlx::mysql::MySqlPoolOptions; -use sqlx::{MySql, Pool}; +use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions}; +use sqlx::{ConnectOptions, MySql, Pool}; use crate::error::{self, Result}; use crate::ir::Ident; @@ -51,12 +56,9 @@ pub async fn init_greptime_connections_via_env() -> Connections { /// Connects to GreptimeDB. pub async fn init_greptime_connections(mysql: Option) -> Connections { let mysql = if let Some(addr) = mysql { - Some( - MySqlPoolOptions::new() - .connect(&format!("mysql://{addr}/public")) - .await - .unwrap(), - ) + let mut opts: MySqlConnectOptions = format!("mysql://{addr}/public").parse().unwrap(); + opts.log_statements(LevelFilter::Off); + Some(MySqlPoolOptions::new().connect_with(opts).await.unwrap()) } else { None }; @@ -89,6 +91,9 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables { } } +pub const GT_FUZZ_CLUSTER_NAMESPACE: &str = "GT_FUZZ_CLUSTER_NAMESPACE"; +pub const GT_FUZZ_CLUSTER_NAME: &str = "GT_FUZZ_CLUSTER_NAME"; + /// Flushes memtable to SST file. pub async fn flush_memtable(e: &Pool, table_name: &Ident) -> Result<()> { let sql = format!("SELECT flush_table(\"{}\")", table_name); diff --git a/tests-fuzz/src/utils/cluster_info.rs b/tests-fuzz/src/utils/cluster_info.rs new file mode 100644 index 0000000000..fa4bbbc540 --- /dev/null +++ b/tests-fuzz/src/utils/cluster_info.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_telemetry::info; +use humantime::parse_duration; +use snafu::ResultExt; +use sqlx::database::HasArguments; +use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type}; + +use super::wait::wait_condition_fn; +use crate::error::{self, Result}; + +pub const PEER_TYPE_DATANODE: &str = "DATANODE"; + +#[derive(Debug, sqlx::FromRow)] +pub struct NodeInfo { + pub peer_id: i64, + pub peer_addr: String, + pub peer_type: String, + pub active_time: Option, +} + +/// Returns all [NodeInfo] in the cluster. +pub async fn fetch_nodes<'a, DB, E>(e: E) -> Result> +where + DB: Database, + >::Arguments: IntoArguments<'a, DB>, + for<'c> E: 'a + Executor<'c, Database = DB>, + for<'c> i64: Decode<'c, DB> + Type, + for<'c> String: Decode<'c, DB> + Type, + for<'c> String: Encode<'c, DB> + Type, + for<'c> &'c str: ColumnIndex<::Row>, +{ + let sql = "select * from information_schema.cluster_info;"; + sqlx::query_as::<_, NodeInfo>(sql) + .fetch_all(e) + .await + .context(error::ExecuteQuerySnafu { sql }) +} + +/// Waits until all datanodes are online within a specified timeout period. +/// +/// This function repeatedly checks the status of all datanodes and waits until all of them are online +/// or the timeout period elapses. A datanode is considered online if its `active_time` is less than 3 seconds. +pub async fn wait_for_all_datanode_online(greptime: Pool, timeout: Duration) { + wait_condition_fn( + timeout, + || { + let greptime = greptime.clone(); + Box::pin(async move { + let nodes = fetch_nodes(&greptime) + .await + .unwrap() + .into_iter() + .flat_map(|node| { + if node.peer_type == PEER_TYPE_DATANODE { + Some(node) + } else { + None + } + }) + .collect::>(); + info!("Waits for all datanode online: {nodes:?}"); + nodes + }) + }, + |nodes| { + nodes + .into_iter() + .map(|node| parse_duration(&node.active_time.unwrap()).unwrap()) + .all(|duration| duration < Duration::from_secs(3)) + }, + Duration::from_secs(5), + ) + .await +} diff --git a/tests-fuzz/src/utils/crd.rs b/tests-fuzz/src/utils/crd.rs index 89d947a34c..c2fdcad9e0 100644 --- a/tests-fuzz/src/utils/crd.rs +++ b/tests-fuzz/src/utils/crd.rs @@ -12,5 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod common; -mod pod; +pub mod common; +pub mod pod; diff --git a/tests-fuzz/src/utils/partition.rs b/tests-fuzz/src/utils/partition.rs new file mode 100644 index 0000000000..db55aec7c1 --- /dev/null +++ b/tests-fuzz/src/utils/partition.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::time::Duration; + +use common_telemetry::info; +use snafu::ResultExt; +use sqlx::database::HasArguments; +use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type}; +use store_api::storage::RegionId; + +use super::wait::wait_condition_fn; +use crate::error::{self, Result}; +use crate::ir::Ident; + +#[derive(Debug, sqlx::FromRow)] +pub struct Partition { + pub datanode_id: u64, + pub region_id: u64, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct PartitionCount { + pub count: i64, +} + +pub async fn count_partitions<'a, DB, E>(e: E, datanode_id: u64) -> Result +where + DB: Database, + >::Arguments: IntoArguments<'a, DB>, + for<'c> E: 'a + Executor<'c, Database = DB>, + for<'c> i64: Decode<'c, DB> + Type, + for<'c> String: Decode<'c, DB> + Type, + for<'c> u64: Encode<'c, DB> + Type, + for<'c> &'c str: ColumnIndex<::Row>, +{ + let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?"; + Ok(sqlx::query_as::<_, PartitionCount>(sql) + .bind(datanode_id) + .fetch_all(e) + .await + .context(error::ExecuteQuerySnafu { sql })? + .remove(0)) +} + +/// Returns all [Partition] of the specific `table` +pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result> +where + DB: Database, + >::Arguments: IntoArguments<'a, DB>, + for<'c> E: 'a + Executor<'c, Database = DB>, + for<'c> u64: Decode<'c, DB> + Type, + for<'c> String: Decode<'c, DB> + Type, + for<'c> String: Encode<'c, DB> + Type, + for<'c> &'c str: ColumnIndex<::Row>, +{ + let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id +from information_schema.partitions a left join information_schema.region_peers b +on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;"; + sqlx::query_as::<_, Partition>(sql) + .bind(table_name.value.to_string()) + .fetch_all(e) + .await + .context(error::ExecuteQuerySnafu { sql }) +} + +/// Creates a distribution map of regions to datanodes based on the provided partitions. +/// +/// This function iterates over the provided partitions and groups the regions by their associated datanode IDs. +pub fn region_distribution(partitions: Vec) -> BTreeMap> { + let mut distribution: BTreeMap> = BTreeMap::new(); + for partition in partitions { + distribution + .entry(partition.datanode_id) + .or_default() + .push(RegionId::from_u64(partition.region_id)); + } + + distribution +} + +/// Pretty prints the region distribution for each datanode. +/// +/// This function logs the number of regions for each datanode in the distribution map. +pub fn pretty_print_region_distribution(distribution: &BTreeMap>) { + for (node, regions) in distribution { + info!("Datanode: {node}, num of regions: {}", regions.len()); + } +} + +/// Waits until all regions are evicted from the specified datanode. +/// +/// This function repeatedly checks the number of partitions on the specified datanode and waits until +/// the count reaches zero or the timeout period elapses. It logs the number of partitions on each check. +pub async fn wait_for_all_regions_evicted( + greptime: Pool, + selected_datanode: u64, + timeout: Duration, +) { + wait_condition_fn( + timeout, + || { + let greptime = greptime.clone(); + Box::pin(async move { + let partition = count_partitions(&greptime, selected_datanode) + .await + .unwrap(); + info!( + "Datanode: {selected_datanode}, num of partitions: {}", + partition.count + ); + partition.count + }) + }, + |count| count == 0, + Duration::from_secs(5), + ) + .await; +} diff --git a/tests-fuzz/src/utils/pod_failure.rs b/tests-fuzz/src/utils/pod_failure.rs new file mode 100644 index 0000000000..03e5ff6085 --- /dev/null +++ b/tests-fuzz/src/utils/pod_failure.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use kube::Api; + +use crate::error::Result; +use crate::utils::crd::common::{Mode, SelectorBuilder}; +use crate::utils::crd::pod::{Action, PodChaos, PodChaosSpecBuilder}; + +/// Injects a pod failure into a specific datanode within a Kubernetes cluster. +/// +/// This function constructs a `PodChaos` custom resource to simulate a pod failure for the specified +/// datanode, and creates this resource in the Kubernetes cluster using the provided client. +pub async fn inject_datanode_pod_failure( + client: kube::Client, + namespace: &str, + cluster_name: &str, + datanode_id: u64, + duration_secs: usize, +) -> Result { + let mut selector = BTreeMap::new(); + let pod_name = format!("{}-datanode-{}", cluster_name, datanode_id); + selector.insert( + "statefulset.kubernetes.io/pod-name".into(), + pod_name.clone(), + ); + let selector = SelectorBuilder::default() + .label_selectors(selector) + .build() + .unwrap(); + + let spec = PodChaosSpecBuilder::default() + .duration(format!("{duration_secs}s")) + .selector(selector) + .action(Action::PodFailure) + .mode(Mode::One) + .build() + .unwrap(); + let chaos_name = format!("{pod_name}-pod-failure"); + let cr = PodChaos::new(&chaos_name, spec); + let api: Api = Api::namespaced(client, namespace); + api.create(&Default::default(), &cr).await.unwrap(); + + Ok(chaos_name) +} + +/// Recovers a pod from a failure by deleting the associated PodChaos resource. +/// +/// This function deletes the PodChaos custom resource with the specified name, effectively +/// recovering the pod from the injected failure. +pub async fn recover_pod_failure(client: kube::Client, namespace: &str, name: &str) -> Result<()> { + let api: Api = Api::namespaced(client, namespace); + api.delete(name, &Default::default()).await.unwrap(); + Ok(()) +} diff --git a/tests-fuzz/src/utils/wait.rs b/tests-fuzz/src/utils/wait.rs new file mode 100644 index 0000000000..813b4b7201 --- /dev/null +++ b/tests-fuzz/src/utils/wait.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use futures::future::BoxFuture; + +/// Waits for a condition to be met by repeatedly checking it within a given timeout period. +/// +/// This function repeatedly calls the `check` function and applies the `condition` function to the result. +/// If the condition is met within the timeout period, the function returns. Otherwise, it waits for the retry +/// interval and checks again, until the timeout period elapses. +pub async fn wait_condition_fn( + timeout: Duration, + check: F, + condition: U, + retry_interval: Duration, +) where + F: Fn() -> BoxFuture<'static, T>, + U: Fn(T) -> bool, +{ + tokio::time::timeout(timeout, async move { + loop { + if condition(check().await) { + break; + } + tokio::time::sleep(retry_interval).await + } + }) + .await + .unwrap(); +} diff --git a/tests-fuzz/src/validator/row.rs b/tests-fuzz/src/validator/row.rs index b17ea1dd67..1e9535d667 100644 --- a/tests-fuzz/src/validator/row.rs +++ b/tests-fuzz/src/validator/row.rs @@ -146,6 +146,27 @@ where Ok(()) } +#[derive(Debug, sqlx::FromRow)] +pub struct ValueCount { + pub count: i64, +} + +pub async fn count_values<'a, DB, E>(e: E, sql: &'a str) -> Result +where + DB: Database, + >::Arguments: IntoArguments<'a, DB>, + for<'c> E: 'a + Executor<'c, Database = DB>, + for<'c> i64: Decode<'c, DB> + Type, + for<'c> String: Encode<'c, DB> + Type, + for<'c> &'c str: ColumnIndex<::Row>, +{ + Ok(sqlx::query_as::<_, ValueCount>(sql) + .fetch_all(e) + .await + .context(error::ExecuteQuerySnafu { sql })? + .remove(0)) +} + /// Returns all [RowEntry] of the `table_name`. pub async fn fetch_values<'a, DB, E>(e: E, sql: &'a str) -> Result::Row>> where diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs new file mode 100644 index 0000000000..65b600ea41 --- /dev/null +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -0,0 +1,354 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::env; +use std::sync::Arc; +use std::time::Duration; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::info; +use common_time::util::current_time_millis; +use futures::future::try_join_all; +use libfuzzer_sys::fuzz_target; +use rand::seq::SliceRandom; +use rand::{Rng, SeedableRng}; +use rand_chacha::{ChaCha20Rng, ChaChaRng}; +use snafu::{ensure, ResultExt}; +use sqlx::{Executor, MySql, Pool}; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map, + MappedGenerator, WordGenerator, +}; +use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::{Generator, Random}; +use tests_fuzz::ir::{ + generate_random_value, generate_unique_timestamp_for_mysql, CreateTableExpr, Ident, + InsertIntoExpr, MySQLTsColumnTypeGenerator, +}; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::utils::cluster_info::wait_for_all_datanode_online; +use tests_fuzz::utils::partition::{ + fetch_partitions, pretty_print_region_distribution, region_distribution, + wait_for_all_regions_evicted, Partition, +}; +use tests_fuzz::utils::pod_failure::{inject_datanode_pod_failure, recover_pod_failure}; +use tests_fuzz::utils::{ + compact_table, flush_memtable, init_greptime_connections_via_env, Connections, + GT_FUZZ_CLUSTER_NAME, GT_FUZZ_CLUSTER_NAMESPACE, +}; +use tests_fuzz::validator::row::count_values; +use tokio::sync::Semaphore; + +struct FuzzContext { + greptime: Pool, + kube: kube::client::Client, + namespace: String, + cluster_name: String, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Copy, Clone, Debug)] +struct FuzzInput { + seed: u64, + columns: usize, + rows: usize, + tables: usize, + inserts: usize, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = u.int_in_range(u64::MIN..=u64::MAX)?; + let mut rng = ChaChaRng::seed_from_u64(seed); + let columns = rng.gen_range(2..64); + let rows = rng.gen_range(2..4096); + let tables = rng.gen_range(1..64); + let inserts = rng.gen_range(2..16); + Ok(FuzzInput { + columns, + rows, + seed, + tables, + inserts, + }) + } +} + +fn generate_create_exprs( + tables: usize, + columns: usize, + rng: &mut R, +) -> Result> { + let name_generator = MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ); + + let base_table_name = name_generator.gen(rng); + let min_column = columns / 2; + let columns = rng.gen_range(min_column..columns); + let mut exprs = Vec::with_capacity(tables); + for i in 0..tables { + let table_name = Ident { + value: format!("{}_{i}", base_table_name.value), + quote_style: base_table_name.quote_style, + }; + let create_table_generator = CreateTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .name(table_name) + .columns(columns) + .engine("mito") + .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) + .build() + .unwrap(); + let expr = create_table_generator.generate(rng)?; + exprs.push(expr) + } + + Ok(exprs) +} + +async fn execute_create_exprs( + ctx: &FuzzContext, + exprs: Vec, + parallelism: usize, +) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(parallelism)); + + let tasks = exprs.iter().map(|expr| { + let semaphore = semaphore.clone(); + let greptime = ctx.greptime.clone(); + async move { + let _permit = semaphore.acquire().await.unwrap(); + let translator = CreateTableExprTranslator; + let sql = translator.translate(expr).unwrap(); + sqlx::query(&sql) + .execute(&greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql }) + } + }); + + let _ = try_join_all(tasks).await?; + Ok(()) +} + +fn generate_insert_exprs( + tables: &[TableContextRef], + rng: &mut R, + rows: usize, + inserts: usize, +) -> Result>> { + let mut exprs = Vec::with_capacity(tables.len()); + for table_ctx in tables { + let omit_column_list = rng.gen_bool(0.2); + let min_rows = rows / 2; + let rows = rng.gen_range(min_rows..rows); + let min_inserts = inserts / 2; + let inserts = rng.gen_range(min_inserts..inserts); + + let insert_generator = InsertExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .omit_column_list(omit_column_list) + .rows(rows) + .ts_value_generator(generate_unique_timestamp_for_mysql(current_time_millis())) + .value_generator(Box::new(generate_random_value)) + .build() + .unwrap(); + + let inserts = (0..inserts) + .map(|_| insert_generator.generate(rng)) + .collect::>>()?; + exprs.push(inserts); + } + + Ok(exprs) +} + +async fn execute_insert_exprs( + ctx: &FuzzContext, + inserts: Vec>, + parallelism: usize, + rng: &mut R, +) -> Result> { + let semaphore = Arc::new(Semaphore::new(parallelism)); + + let tasks = inserts.into_iter().map(|inserts| { + let flush_probability = rng.gen_range(0.0..1.0); + let compact_probability = rng.gen_range(0.0..1.0); + let seed: u64 = rng.gen(); + + let semaphore = semaphore.clone(); + let greptime = ctx.greptime.clone(); + async move { + let _permit = semaphore.acquire().await.unwrap(); + let mut total_affected = 0; + let mut rng = ChaCha20Rng::seed_from_u64(seed); + for insert_expr in inserts { + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let result = greptime + // unprepared query, see + .execute(sql.as_str()) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + ensure!( + result.rows_affected() == insert_expr.values_list.len() as u64, + error::AssertSnafu { + reason: format!( + "expected rows affected: {}, actual: {}", + insert_expr.values_list.len(), + result.rows_affected(), + ) + } + ); + if rng.gen_bool(flush_probability) { + flush_memtable(&greptime, &insert_expr.table_name).await?; + } + if rng.gen_bool(compact_probability) { + compact_table(&greptime, &insert_expr.table_name).await?; + } + total_affected += result.rows_affected(); + } + + Ok(total_affected) + } + }); + + try_join_all(tasks).await +} + +async fn collect_table_partitions( + ctx: &FuzzContext, + table_ctxs: &[TableContextRef], +) -> Result> { + let mut partitions = Vec::with_capacity(table_ctxs.len()); + for table_ctx in table_ctxs { + let table_partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?; + info!( + "table: {}, partitions: {:?}", + table_ctx.name, table_partitions + ); + partitions.extend(table_partitions) + } + Ok(partitions) +} + +async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + let mut rng = ChaCha20Rng::seed_from_u64(input.seed); + info!("Generates {} tables", input.tables); + let exprs = generate_create_exprs(input.tables, input.columns, &mut rng)?; + let parallelism = 8; + let table_ctxs = exprs + .iter() + .map(|expr| Arc::new(TableContext::from(expr))) + .collect::>(); + info!("Creates tables"); + execute_create_exprs(&ctx, exprs, parallelism).await?; + let insert_exprs = generate_insert_exprs(&table_ctxs, &mut rng, input.rows, input.inserts)?; + info!("Inserts value into tables"); + let affected_rows = execute_insert_exprs(&ctx, insert_exprs, parallelism, &mut rng).await?; + let partitions = collect_table_partitions(&ctx, &table_ctxs).await?; + let region_distribution = region_distribution(partitions); + + // Ensures num of datanode > 1. + assert!(region_distribution.len() > 1); + pretty_print_region_distribution(®ion_distribution); + let nodes = region_distribution.keys().cloned().collect::>(); + let selected_datanode = nodes + .choose_multiple(&mut rng, 1) + .cloned() + .collect::>() + .remove(0); + let selected_regions = region_distribution + .get(&selected_datanode) + .cloned() + .unwrap(); + + // Injects pod failures + info!("Injects pod failures to datanode: {selected_datanode}, regions: {selected_regions:?}"); + let chaos_name = inject_datanode_pod_failure( + ctx.kube.clone(), + &ctx.namespace, + &ctx.cluster_name, + selected_datanode, + 360, + ) + .await?; + + // Waits for num of regions on `selected_datanode` become to 0. + wait_for_all_regions_evicted( + ctx.greptime.clone(), + selected_datanode, + Duration::from_secs(300), + ) + .await; + + // Recovers pod failures + recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; + wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; + + // Validates value rows + info!("Validates num of values"); + for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) { + let sql = format!("select count(1) as count from {}", table_ctx.name); + let values = count_values(&ctx.greptime, &sql).await?; + assert_eq!(values.count as u64, expected_rows); + } + + for table_ctx in table_ctxs { + let sql = format!("DROP TABLE {}", table_ctx.name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Drop table: {}\n\nResult: {result:?}\n\n", table_ctx.name); + } + + ctx.close().await; + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_write(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + kube: kube::client::Client::try_default() + .await + .expect("init kube client"), + namespace: env::var(GT_FUZZ_CLUSTER_NAMESPACE).unwrap_or("my-greptimedb".to_string()), + cluster_name: env::var(GT_FUZZ_CLUSTER_NAME).unwrap_or("my-greptimedb".to_string()), + }; + execute_failover(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});