test: add e2e test for region failover (#4188)

* test: add e2e test for region failover

* chore: add ci cfg

* chore: reduce parallelism to 8

* fix(ci): enable region failure

* chore: set sqlx LogLevel to Off

* refactor: move help functions to utils
This commit is contained in:
Weny Xu
2024-06-28 14:49:41 +08:00
committed by GitHub
parent b6585e3581
commit 352cc9ddde
18 changed files with 898 additions and 23 deletions

View File

@@ -27,6 +27,8 @@ common-time = { workspace = true }
datatypes = { workspace = true }
derive_builder = { workspace = true }
dotenv = "0.15"
futures = { workspace = true }
humantime = { workspace = true }
k8s-openapi = { version = "0.22", features = ["v1_30"] }
kube = { version = "0.92", features = [
"runtime",
@@ -54,6 +56,7 @@ sqlx = { version = "0.6", features = [
"postgres",
"chrono",
] }
store-api = { workspace = true }
strum.workspace = true
tinytemplate = "1.2"
tokio = { workspace = true }
@@ -117,3 +120,10 @@ test = false
bench = false
doc = false
required-features = ["unstable"]
[[bin]]
name = "fuzz_failover_mito_regions"
path = "targets/failover/fuzz_failover_mito_regions.rs"
test = false
bench = false
doc = false

View File

@@ -44,7 +44,7 @@ pub struct CreateTableExprGenerator<R: Rng + 'static> {
partition: usize,
if_not_exists: bool,
#[builder(setter(into))]
name: String,
name: Ident,
#[builder(setter(into))]
with_clause: HashMap<String, String>,
name_generator: Box<dyn Random<Ident, R>>,
@@ -65,7 +65,7 @@ impl<R: Rng + 'static> Default for CreateTableExprGenerator<R> {
engine: DEFAULT_ENGINE.to_string(),
if_not_exists: false,
partition: 0,
name: String::new(),
name: Ident::new(""),
with_clause: HashMap::default(),
name_generator: Box::new(MappedGenerator::new(WordGenerator, random_capitalize_map)),
ts_column_type_generator: Box::new(TsColumnTypeGenerator),
@@ -190,7 +190,7 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
if self.name.is_empty() {
builder.table_name(self.name_generator.gen(rng));
} else {
builder.table_name(self.name.to_string());
builder.table_name(self.name.clone());
}
if !self.with_clause.is_empty() {
let mut options = HashMap::new();

View File

@@ -103,7 +103,7 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
}
Ok(InsertIntoExpr {
table_name: self.table_ctx.name.to_string(),
table_name: self.table_ctx.name.clone(),
omit_column_list: self.omit_column_list,
columns: values_columns,
values_list,

View File

@@ -39,7 +39,7 @@ use serde::{Deserialize, Serialize};
use self::insert_expr::{RowValue, RowValues};
use crate::context::TableContextRef;
use crate::generator::Random;
use crate::generator::{Random, TsValueGenerator};
use crate::impl_random;
use crate::ir::create_expr::ColumnOption;
@@ -127,12 +127,10 @@ pub fn generate_random_value<R: Rng>(
}
/// Generate monotonically increasing timestamps for MySQL.
pub fn generate_unique_timestamp_for_mysql<R: Rng>(
base: i64,
) -> impl Fn(&mut R, TimestampType) -> Value {
pub fn generate_unique_timestamp_for_mysql<R: Rng>(base: i64) -> TsValueGenerator<R> {
let base = Arc::new(AtomicI64::new(base));
move |_rng, ts_type| -> Value {
Box::new(move |_rng, ts_type| -> Value {
let value = base.fetch_add(1, Ordering::Relaxed);
let v = match ts_type {
TimestampType::Second(_) => Timestamp::new_second(1 + value),
@@ -141,7 +139,7 @@ pub fn generate_unique_timestamp_for_mysql<R: Rng>(
TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value),
};
Value::from(v)
}
})
}
/// Generate random timestamps.
@@ -253,6 +251,10 @@ impl Ident {
quote_style: Some(quote),
}
}
pub fn is_empty(&self) -> bool {
self.value.is_empty()
}
}
impl From<&str> for Ident {

View File

@@ -16,10 +16,10 @@ use std::fmt::{Debug, Display};
use datatypes::value::Value;
use crate::ir::Column;
use crate::ir::{Column, Ident};
pub struct InsertIntoExpr {
pub table_name: String,
pub table_name: Ident,
pub omit_column_list: bool,
pub columns: Vec<Column>,
pub values_list: Vec<RowValues>,

View File

@@ -12,18 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod cluster_info;
pub mod config;
pub mod crd;
pub mod health;
pub mod partition;
pub mod pod_failure;
#[cfg(feature = "unstable")]
pub mod process;
pub mod wait;
use std::env;
use common_telemetry::info;
use common_telemetry::tracing::log::LevelFilter;
use snafu::ResultExt;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};
use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
use sqlx::{ConnectOptions, MySql, Pool};
use crate::error::{self, Result};
use crate::ir::Ident;
@@ -51,12 +56,9 @@ pub async fn init_greptime_connections_via_env() -> Connections {
/// Connects to GreptimeDB.
pub async fn init_greptime_connections(mysql: Option<String>) -> Connections {
let mysql = if let Some(addr) = mysql {
Some(
MySqlPoolOptions::new()
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap(),
)
let mut opts: MySqlConnectOptions = format!("mysql://{addr}/public").parse().unwrap();
opts.log_statements(LevelFilter::Off);
Some(MySqlPoolOptions::new().connect_with(opts).await.unwrap())
} else {
None
};
@@ -89,6 +91,9 @@ pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
}
}
pub const GT_FUZZ_CLUSTER_NAMESPACE: &str = "GT_FUZZ_CLUSTER_NAMESPACE";
pub const GT_FUZZ_CLUSTER_NAME: &str = "GT_FUZZ_CLUSTER_NAME";
/// Flushes memtable to SST file.
pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT flush_table(\"{}\")", table_name);

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_telemetry::info;
use humantime::parse_duration;
use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type};
use super::wait::wait_condition_fn;
use crate::error::{self, Result};
pub const PEER_TYPE_DATANODE: &str = "DATANODE";
#[derive(Debug, sqlx::FromRow)]
pub struct NodeInfo {
pub peer_id: i64,
pub peer_addr: String,
pub peer_type: String,
pub active_time: Option<String>,
}
/// Returns all [NodeInfo] in the cluster.
pub async fn fetch_nodes<'a, DB, E>(e: E) -> Result<Vec<NodeInfo>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select * from information_schema.cluster_info;";
sqlx::query_as::<_, NodeInfo>(sql)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })
}
/// Waits until all datanodes are online within a specified timeout period.
///
/// This function repeatedly checks the status of all datanodes and waits until all of them are online
/// or the timeout period elapses. A datanode is considered online if its `active_time` is less than 3 seconds.
pub async fn wait_for_all_datanode_online(greptime: Pool<MySql>, timeout: Duration) {
wait_condition_fn(
timeout,
|| {
let greptime = greptime.clone();
Box::pin(async move {
let nodes = fetch_nodes(&greptime)
.await
.unwrap()
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("Waits for all datanode online: {nodes:?}");
nodes
})
},
|nodes| {
nodes
.into_iter()
.map(|node| parse_duration(&node.active_time.unwrap()).unwrap())
.all(|duration| duration < Duration::from_secs(3))
},
Duration::from_secs(5),
)
.await
}

View File

@@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
mod pod;
pub mod common;
pub mod pod;

View File

@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::time::Duration;
use common_telemetry::info;
use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type};
use store_api::storage::RegionId;
use super::wait::wait_condition_fn;
use crate::error::{self, Result};
use crate::ir::Ident;
#[derive(Debug, sqlx::FromRow)]
pub struct Partition {
pub datanode_id: u64,
pub region_id: u64,
}
#[derive(Debug, sqlx::FromRow)]
pub struct PartitionCount {
pub count: i64,
}
pub async fn count_partitions<'a, DB, E>(e: E, datanode_id: u64) -> Result<PartitionCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> u64: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?";
Ok(sqlx::query_as::<_, PartitionCount>(sql)
.bind(datanode_id)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
}
/// Returns all [Partition] of the specific `table`
pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result<Vec<Partition>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> u64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id
from information_schema.partitions a left join information_schema.region_peers b
on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;";
sqlx::query_as::<_, Partition>(sql)
.bind(table_name.value.to_string())
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })
}
/// Creates a distribution map of regions to datanodes based on the provided partitions.
///
/// This function iterates over the provided partitions and groups the regions by their associated datanode IDs.
pub fn region_distribution(partitions: Vec<Partition>) -> BTreeMap<u64, Vec<RegionId>> {
let mut distribution: BTreeMap<u64, Vec<RegionId>> = BTreeMap::new();
for partition in partitions {
distribution
.entry(partition.datanode_id)
.or_default()
.push(RegionId::from_u64(partition.region_id));
}
distribution
}
/// Pretty prints the region distribution for each datanode.
///
/// This function logs the number of regions for each datanode in the distribution map.
pub fn pretty_print_region_distribution(distribution: &BTreeMap<u64, Vec<RegionId>>) {
for (node, regions) in distribution {
info!("Datanode: {node}, num of regions: {}", regions.len());
}
}
/// Waits until all regions are evicted from the specified datanode.
///
/// This function repeatedly checks the number of partitions on the specified datanode and waits until
/// the count reaches zero or the timeout period elapses. It logs the number of partitions on each check.
pub async fn wait_for_all_regions_evicted(
greptime: Pool<MySql>,
selected_datanode: u64,
timeout: Duration,
) {
wait_condition_fn(
timeout,
|| {
let greptime = greptime.clone();
Box::pin(async move {
let partition = count_partitions(&greptime, selected_datanode)
.await
.unwrap();
info!(
"Datanode: {selected_datanode}, num of partitions: {}",
partition.count
);
partition.count
})
},
|count| count == 0,
Duration::from_secs(5),
)
.await;
}

View File

@@ -0,0 +1,68 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use kube::Api;
use crate::error::Result;
use crate::utils::crd::common::{Mode, SelectorBuilder};
use crate::utils::crd::pod::{Action, PodChaos, PodChaosSpecBuilder};
/// Injects a pod failure into a specific datanode within a Kubernetes cluster.
///
/// This function constructs a `PodChaos` custom resource to simulate a pod failure for the specified
/// datanode, and creates this resource in the Kubernetes cluster using the provided client.
pub async fn inject_datanode_pod_failure(
client: kube::Client,
namespace: &str,
cluster_name: &str,
datanode_id: u64,
duration_secs: usize,
) -> Result<String> {
let mut selector = BTreeMap::new();
let pod_name = format!("{}-datanode-{}", cluster_name, datanode_id);
selector.insert(
"statefulset.kubernetes.io/pod-name".into(),
pod_name.clone(),
);
let selector = SelectorBuilder::default()
.label_selectors(selector)
.build()
.unwrap();
let spec = PodChaosSpecBuilder::default()
.duration(format!("{duration_secs}s"))
.selector(selector)
.action(Action::PodFailure)
.mode(Mode::One)
.build()
.unwrap();
let chaos_name = format!("{pod_name}-pod-failure");
let cr = PodChaos::new(&chaos_name, spec);
let api: Api<PodChaos> = Api::namespaced(client, namespace);
api.create(&Default::default(), &cr).await.unwrap();
Ok(chaos_name)
}
/// Recovers a pod from a failure by deleting the associated PodChaos resource.
///
/// This function deletes the PodChaos custom resource with the specified name, effectively
/// recovering the pod from the injected failure.
pub async fn recover_pod_failure(client: kube::Client, namespace: &str, name: &str) -> Result<()> {
let api: Api<PodChaos> = Api::namespaced(client, namespace);
api.delete(name, &Default::default()).await.unwrap();
Ok(())
}

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use futures::future::BoxFuture;
/// Waits for a condition to be met by repeatedly checking it within a given timeout period.
///
/// This function repeatedly calls the `check` function and applies the `condition` function to the result.
/// If the condition is met within the timeout period, the function returns. Otherwise, it waits for the retry
/// interval and checks again, until the timeout period elapses.
pub async fn wait_condition_fn<F, T, U>(
timeout: Duration,
check: F,
condition: U,
retry_interval: Duration,
) where
F: Fn() -> BoxFuture<'static, T>,
U: Fn(T) -> bool,
{
tokio::time::timeout(timeout, async move {
loop {
if condition(check().await) {
break;
}
tokio::time::sleep(retry_interval).await
}
})
.await
.unwrap();
}

View File

@@ -146,6 +146,27 @@ where
Ok(())
}
#[derive(Debug, sqlx::FromRow)]
pub struct ValueCount {
pub count: i64,
}
pub async fn count_values<'a, DB, E>(e: E, sql: &'a str) -> Result<ValueCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
Ok(sqlx::query_as::<_, ValueCount>(sql)
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
}
/// Returns all [RowEntry] of the `table_name`.
pub async fn fetch_values<'a, DB, E>(e: E, sql: &'a str) -> Result<Vec<<DB as Database>::Row>>
where

View File

@@ -0,0 +1,354 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![no_main]
use std::env;
use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_telemetry::info;
use common_time::util::current_time_millis;
use futures::future::try_join_all;
use libfuzzer_sys::fuzz_target;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use rand_chacha::{ChaCha20Rng, ChaChaRng};
use snafu::{ensure, ResultExt};
use sqlx::{Executor, MySql, Pool};
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::{Generator, Random};
use tests_fuzz::ir::{
generate_random_value, generate_unique_timestamp_for_mysql, CreateTableExpr, Ident,
InsertIntoExpr, MySQLTsColumnTypeGenerator,
};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::cluster_info::wait_for_all_datanode_online;
use tests_fuzz::utils::partition::{
fetch_partitions, pretty_print_region_distribution, region_distribution,
wait_for_all_regions_evicted, Partition,
};
use tests_fuzz::utils::pod_failure::{inject_datanode_pod_failure, recover_pod_failure};
use tests_fuzz::utils::{
compact_table, flush_memtable, init_greptime_connections_via_env, Connections,
GT_FUZZ_CLUSTER_NAME, GT_FUZZ_CLUSTER_NAMESPACE,
};
use tests_fuzz::validator::row::count_values;
use tokio::sync::Semaphore;
struct FuzzContext {
greptime: Pool<MySql>,
kube: kube::client::Client,
namespace: String,
cluster_name: String,
}
impl FuzzContext {
async fn close(self) {
self.greptime.close().await;
}
}
#[derive(Copy, Clone, Debug)]
struct FuzzInput {
seed: u64,
columns: usize,
rows: usize,
tables: usize,
inserts: usize,
}
impl Arbitrary<'_> for FuzzInput {
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
let seed = u.int_in_range(u64::MIN..=u64::MAX)?;
let mut rng = ChaChaRng::seed_from_u64(seed);
let columns = rng.gen_range(2..64);
let rows = rng.gen_range(2..4096);
let tables = rng.gen_range(1..64);
let inserts = rng.gen_range(2..16);
Ok(FuzzInput {
columns,
rows,
seed,
tables,
inserts,
})
}
}
fn generate_create_exprs<R: Rng + 'static>(
tables: usize,
columns: usize,
rng: &mut R,
) -> Result<Vec<CreateTableExpr>> {
let name_generator = MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
);
let base_table_name = name_generator.gen(rng);
let min_column = columns / 2;
let columns = rng.gen_range(min_column..columns);
let mut exprs = Vec::with_capacity(tables);
for i in 0..tables {
let table_name = Ident {
value: format!("{}_{i}", base_table_name.value),
quote_style: base_table_name.quote_style,
};
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.name(table_name)
.columns(columns)
.engine("mito")
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
.build()
.unwrap();
let expr = create_table_generator.generate(rng)?;
exprs.push(expr)
}
Ok(exprs)
}
async fn execute_create_exprs(
ctx: &FuzzContext,
exprs: Vec<CreateTableExpr>,
parallelism: usize,
) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let tasks = exprs.iter().map(|expr| {
let semaphore = semaphore.clone();
let greptime = ctx.greptime.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
let translator = CreateTableExprTranslator;
let sql = translator.translate(expr).unwrap();
sqlx::query(&sql)
.execute(&greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })
}
});
let _ = try_join_all(tasks).await?;
Ok(())
}
fn generate_insert_exprs<R: Rng + 'static>(
tables: &[TableContextRef],
rng: &mut R,
rows: usize,
inserts: usize,
) -> Result<Vec<Vec<InsertIntoExpr>>> {
let mut exprs = Vec::with_capacity(tables.len());
for table_ctx in tables {
let omit_column_list = rng.gen_bool(0.2);
let min_rows = rows / 2;
let rows = rng.gen_range(min_rows..rows);
let min_inserts = inserts / 2;
let inserts = rng.gen_range(min_inserts..inserts);
let insert_generator = InsertExprGeneratorBuilder::default()
.table_ctx(table_ctx.clone())
.omit_column_list(omit_column_list)
.rows(rows)
.ts_value_generator(generate_unique_timestamp_for_mysql(current_time_millis()))
.value_generator(Box::new(generate_random_value))
.build()
.unwrap();
let inserts = (0..inserts)
.map(|_| insert_generator.generate(rng))
.collect::<Result<Vec<_>>>()?;
exprs.push(inserts);
}
Ok(exprs)
}
async fn execute_insert_exprs<R: Rng + 'static>(
ctx: &FuzzContext,
inserts: Vec<Vec<InsertIntoExpr>>,
parallelism: usize,
rng: &mut R,
) -> Result<Vec<u64>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let tasks = inserts.into_iter().map(|inserts| {
let flush_probability = rng.gen_range(0.0..1.0);
let compact_probability = rng.gen_range(0.0..1.0);
let seed: u64 = rng.gen();
let semaphore = semaphore.clone();
let greptime = ctx.greptime.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
let mut total_affected = 0;
let mut rng = ChaCha20Rng::seed_from_u64(seed);
for insert_expr in inserts {
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let result = greptime
// unprepared query, see <https://github.com/GreptimeTeam/greptimedb/issues/3500>
.execute(sql.as_str())
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
ensure!(
result.rows_affected() == insert_expr.values_list.len() as u64,
error::AssertSnafu {
reason: format!(
"expected rows affected: {}, actual: {}",
insert_expr.values_list.len(),
result.rows_affected(),
)
}
);
if rng.gen_bool(flush_probability) {
flush_memtable(&greptime, &insert_expr.table_name).await?;
}
if rng.gen_bool(compact_probability) {
compact_table(&greptime, &insert_expr.table_name).await?;
}
total_affected += result.rows_affected();
}
Ok(total_affected)
}
});
try_join_all(tasks).await
}
async fn collect_table_partitions(
ctx: &FuzzContext,
table_ctxs: &[TableContextRef],
) -> Result<Vec<Partition>> {
let mut partitions = Vec::with_capacity(table_ctxs.len());
for table_ctx in table_ctxs {
let table_partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
info!(
"table: {}, partitions: {:?}",
table_ctx.name, table_partitions
);
partitions.extend(table_partitions)
}
Ok(partitions)
}
async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut rng = ChaCha20Rng::seed_from_u64(input.seed);
info!("Generates {} tables", input.tables);
let exprs = generate_create_exprs(input.tables, input.columns, &mut rng)?;
let parallelism = 8;
let table_ctxs = exprs
.iter()
.map(|expr| Arc::new(TableContext::from(expr)))
.collect::<Vec<_>>();
info!("Creates tables");
execute_create_exprs(&ctx, exprs, parallelism).await?;
let insert_exprs = generate_insert_exprs(&table_ctxs, &mut rng, input.rows, input.inserts)?;
info!("Inserts value into tables");
let affected_rows = execute_insert_exprs(&ctx, insert_exprs, parallelism, &mut rng).await?;
let partitions = collect_table_partitions(&ctx, &table_ctxs).await?;
let region_distribution = region_distribution(partitions);
// Ensures num of datanode > 1.
assert!(region_distribution.len() > 1);
pretty_print_region_distribution(&region_distribution);
let nodes = region_distribution.keys().cloned().collect::<Vec<_>>();
let selected_datanode = nodes
.choose_multiple(&mut rng, 1)
.cloned()
.collect::<Vec<_>>()
.remove(0);
let selected_regions = region_distribution
.get(&selected_datanode)
.cloned()
.unwrap();
// Injects pod failures
info!("Injects pod failures to datanode: {selected_datanode}, regions: {selected_regions:?}");
let chaos_name = inject_datanode_pod_failure(
ctx.kube.clone(),
&ctx.namespace,
&ctx.cluster_name,
selected_datanode,
360,
)
.await?;
// Waits for num of regions on `selected_datanode` become to 0.
wait_for_all_regions_evicted(
ctx.greptime.clone(),
selected_datanode,
Duration::from_secs(300),
)
.await;
// Recovers pod failures
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
// Validates value rows
info!("Validates num of values");
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
assert_eq!(values.count as u64, expected_rows);
}
for table_ctx in table_ctxs {
let sql = format!("DROP TABLE {}", table_ctx.name);
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Drop table: {}\n\nResult: {result:?}\n\n", table_ctx.name);
}
ctx.close().await;
Ok(())
}
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
kube: kube::client::Client::try_default()
.await
.expect("init kube client"),
namespace: env::var(GT_FUZZ_CLUSTER_NAMESPACE).unwrap_or("my-greptimedb".to_string()),
cluster_name: env::var(GT_FUZZ_CLUSTER_NAME).unwrap_or("my-greptimedb".to_string()),
};
execute_failover(ctx, input)
.await
.unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}"));
})
});