test: reproduce bugs

This commit is contained in:
WenyXu
2024-04-18 13:23:42 +00:00
parent 7e1eed4b18
commit bf07dd275a
13 changed files with 688 additions and 13 deletions

21
Cargo.lock generated
View File

@@ -10061,6 +10061,27 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-chaos"
version = "0.7.2"
dependencies = [
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"nix 0.26.4",
"rand",
"rand_chacha",
"reqwest",
"serde",
"serde_json",
"snafu",
"sqlx",
"tests-fuzz",
"tinytemplate",
"tokio",
]
[[package]]
name = "tests-fuzz"
version = "0.7.2"

View File

@@ -55,6 +55,7 @@ members = [
"src/store-api",
"src/table",
"src/index",
"tests-chaos",
"tests-fuzz",
"tests-integration",
"tests/runner",
@@ -212,6 +213,7 @@ sql = { path = "src/sql" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
tests-fuzz = { path = "tests-fuzz" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"

27
tests-chaos/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "tests-chaos"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time = { workspace = true }
nix = { version = "0.26", features = ["process"] }
rand = { workspace = true }
rand_chacha = "0.3.1"
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
sqlx = { version = "0.6", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
tests-fuzz.workspace = true
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -0,0 +1,82 @@
mode = "standalone"
enable_memory_catalog = false
skip_write = false
[http_options]
addr = "127.0.0.1:4000"
timeout = "30s"
body_limit = "64MB"
[grpc_options]
addr = "127.0.0.1:4001"
runtime_size = 8
[mysql_options]
addr = "127.0.0.1:4002"
runtime_size = 2
[mysql_options.tls]
mode = "disable"
cert_path = ""
key_path = ""
[postgres_options]
addr = "127.0.0.1:4003"
runtime_size = 2
[postgres_options.tls]
mode = "disable"
cert_path = ""
key_path = ""
[opentsdb_options]
addr = "127.0.0.1:4242"
runtime_size = 2
[influxdb_options]
enable = true
[prometheus_options]
enable = true
[prom_options]
addr = "127.0.0.1:4004"
[wal]
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[storage]
type = "File"
data_home = '{data_home}'
global_ttl = "15m"
skip_wal = true
sst_compression = "lz4raw"
memtable_type = "time_series"
[storage.compaction]
max_inflight_tasks = 4
max_files_in_level0 = 5
max_purge_tasks = 4
purge_expired_only = true
[storage.manifest]
checkpoint_margin = 128
gc_duration = '10m'
checkpoint_on_startup = false
[storage.flush]
max_flush_tasks = 2
region_write_buffer_size = "1MB"
picker_schedule_interval = "5m"
global_write_buffer_size = "150MB"
[procedure]
max_retry_times = 3
retry_delay = "500ms"
[logging]
enable_logcat = false

15
tests-chaos/src/bare.rs Normal file
View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod process;

View File

@@ -0,0 +1,128 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::process::{ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use common_telemetry::{info, warn};
use nix::sys::signal::Signal;
use snafu::ResultExt;
use tokio::process::Child;
use crate::error::{self, Result};
pub(crate) type Pid = u32;
/// The state of a process.
#[derive(Debug, Clone)]
pub(crate) struct Process {
exit_status: Option<ExitStatus>,
exited: bool,
}
/// ProcessManager provides the ability to spawn/wait/kill a child process.
#[derive(Debug, Clone)]
pub(crate) struct ProcessManager {
processes: Arc<Mutex<HashMap<Pid, Process>>>,
}
/// The callback while the child process exits.
pub type OnChildExitResult = std::result::Result<ExitStatus, std::io::Error>;
impl ProcessManager {
pub fn new() -> Self {
Self {
processes: Arc::new(Default::default()),
}
}
fn wait<F>(&self, mut child: Child, f: F)
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
let processes = self.processes.clone();
tokio::spawn(async move {
// Safety: caller checked
let pid = child.id().unwrap();
let result = child.wait().await;
match result {
Ok(code) => {
warn!("pid: {pid} exited with status: {}", code);
f(pid, Ok(code));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exit_status = Some(code);
process.exited = true;
});
}
Err(err) => {
warn!("pid: {pid} exited with error: {}", err);
f(pid, Err(err));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exited = true;
});
}
}
});
}
/// Spawns a new process.
pub fn spawn<T: Into<Stdio>, F>(
&self,
binary: &str,
args: &[String],
stdout: T,
stderr: T,
on_exit: F,
) -> Result<Pid>
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
info!("starting {} with {:?}", binary, args);
let child = tokio::process::Command::new(binary)
.args(args)
.stdout(stdout)
.stderr(stderr)
.spawn()
.context(error::SpawnChildSnafu)?;
let pid = child.id();
if let Some(pid) = pid {
self.processes.lock().unwrap().insert(
pid,
Process {
exit_status: None,
exited: false,
},
);
self.wait(child, on_exit);
Ok(pid)
} else {
error::UnexpectedExitedSnafu {}.fail()
}
}
/// Kills a process via [Pid].
pub fn kill<T: Into<Option<Signal>>>(pid: Pid, signal: T) -> Result<()> {
let signal: Option<Signal> = signal.into();
info!("kill pid :{} siganl: {:?}", pid, signal);
// Safety: checked.
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), signal)
.context(error::KillProcessSnafu)?;
Ok(())
}
}

60
tests-chaos/src/error.rs Normal file
View File

@@ -0,0 +1,60 @@
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to spawn a child process"))]
SpawnChild {
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Child process exited unexpected"))]
UnexpectedExited { location: Location },
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { err_msg: String, location: Location },
#[snafu(display("Failed to kill a process"))]
KillProcess {
location: Location,
#[snafu(source)]
error: nix::Error,
},
#[snafu(display("Failed to create a file: {}", path))]
CreateFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to create dir all"))]
CreateDirAll {
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to write a file: {}", path))]
WriteFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to execute query: {}", sql))]
ExecuteQuery {
sql: String,
#[snafu(source)]
error: sqlx::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;

238
tests-chaos/src/main.rs Normal file
View File

@@ -0,0 +1,238 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bare::process::{Pid, ProcessManager};
use common_telemetry::{info, warn};
use nix::sys::signal::Signal;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};
use tests_fuzz::context::TableContext;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::validator;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
mod bare;
mod error;
mod utils;
use crate::error::Result;
use crate::utils::{generate_create_table_expr, get_conf_path, path_to_stdio, render_config_file};
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
#[tokio::main]
async fn main() {
common_telemetry::init_default_ut_logging();
let state = Arc::new(TestState {
killed: AtomicBool::new(false),
});
let moved_state = state.clone();
tokio::spawn(async move {
let mut rng = ChaChaRng::seed_from_u64(0);
loop {
warn!("Staring");
let pid = start_database().await.expect("Failed to start database");
let secs = rng.gen_range(1..2);
moved_state.killed.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(secs)).await;
warn!("After {secs}s, Killing pid: {pid}");
moved_state.killed.store(true, Ordering::Relaxed);
ProcessManager::kill(pid, Signal::SIGKILL).expect("Failed to kill");
}
});
let mut rng = ChaChaRng::seed_from_u64(0);
let client = connect_db("127.0.0.1:4002").await;
let mut created_table = HashSet::new();
loop {
run_test(&client, &mut created_table, &state, &mut rng)
.await
.unwrap();
}
}
async fn connect_db(addr: &str) -> Pool<MySql> {
MySqlPoolOptions::new()
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap()
}
struct TestState {
killed: AtomicBool,
}
async fn run_test<R: Rng + 'static>(
client: &Pool<MySql>,
created_table: &mut HashSet<String>,
state: &Arc<TestState>,
rng: &mut R,
) -> Result<()> {
let expr = generate_create_table_expr(rng);
let table_name = expr.table_name.to_string();
if created_table.contains(&table_name) {
warn!("ignores same name table: {table_name}");
// ignores.
return Ok(());
}
let table_ctx = TableContext::from(&expr);
let translator = CreateTableExprTranslator;
let sql = translator.translate(&expr).unwrap();
let result = sqlx::query(&sql).execute(client).await;
match result {
Ok(result) => {
created_table.insert(table_name);
validate_mysql(client, state, &table_ctx).await;
info!("Create table: {sql}, result: {result:?}");
}
Err(err) => {
ensure!(
state.killed.load(Ordering::Relaxed),
error::UnexpectedSnafu {
err_msg: err.to_string(),
}
);
created_table.insert(table_name);
}
}
Ok(())
}
async fn validate_mysql(client: &Pool<MySql>, _state: &Arc<TestState>, table_ctx: &TableContext) {
loop {
match validator::column::fetch_columns_via_mysql(
client,
"public".into(),
table_ctx.name.clone(),
)
.await
{
Ok(mut column_entries) => {
column_entries.sort_by(|a, b| a.column_name.cmp(&b.column_name));
let mut columns = table_ctx.columns.clone();
columns.sort_by(|a, b| a.name.value.cmp(&b.name.value));
validator::column::assert_eq(&column_entries, &columns).unwrap();
return;
}
Err(err) => warn!(
"Failed to fetch table '{}' columns, error: {}",
table_ctx.name, err
),
}
}
}
async fn start_database() -> Result<Pid> {
let binary_path = "/home/weny/Projects/greptimedb-cuckoo/target/debug/greptime";
let test_dir = "/tmp/greptimedb-cuckoo/";
let template_filename = "standalone-v0.3.2.toml.template";
let health_url = "http://127.0.0.1:4000/health";
let process_manager = ProcessManager::new();
let pid = start_process(&process_manager, binary_path, test_dir, template_filename)
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(10), health_check(health_url))
.await
.expect("Failed to start GreptimeDB process");
info!("GreptimeDB started, pid: {pid}");
Ok(pid)
}
async fn start_process(
process_manager: &ProcessManager,
binary: &str,
test_dir: &str,
template_filename: &str,
) -> Result<Pid> {
tokio::fs::create_dir_all(test_dir)
.await
.context(error::CreateDirAllSnafu)?;
let data_home = format!("{test_dir}data_home");
info!("data home: {}", data_home);
// Prepares the config file
let mut conf_path = get_conf_path();
conf_path.push(template_filename);
let template_path = conf_path.to_str().unwrap().to_string();
let conf_path = format!("{test_dir}config.toml");
info!("conf path: {}", conf_path);
#[derive(Serialize)]
struct Context {
data_home: String,
}
let conf_content = render_config_file(&template_path, &Context { data_home });
let mut config_file = File::create(&conf_path)
.await
.context(error::CreateFileSnafu { path: &conf_path })?;
config_file
.write_all(conf_content.as_bytes())
.await
.context(error::WriteFileSnafu { path: &conf_path })?;
let args = vec![
DEFAULT_LOG_LEVEL.to_string(),
"standalone".to_string(),
"start".to_string(),
format!("--config-file={conf_path}"),
];
let now = common_time::util::current_time_millis();
let stdout = format!("{test_dir}stdout-{}", now);
let stderr = format!("{test_dir}stderr-{}", now);
info!("stdout: {}, stderr: {}", stdout, stderr);
let stdout = path_to_stdio(&stdout).await?;
let stderr = path_to_stdio(&stderr).await?;
let on_exit = move |pid, result| {
info!("The pid: {pid} exited, result: {result:?}");
};
process_manager.spawn(binary, &args, stdout, stderr, on_exit)
}
async fn health_check(url: &str) {
loop {
match reqwest::get(url).await {
Ok(resp) => {
if resp.status() == 200 {
info!("health checked!");
return;
}
info!("failed to health, status: {}", resp.status());
}
Err(err) => {
info!("failed to health, err: {err:?}");
}
}
info!("checking health later...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

105
tests-chaos/src/utils.rs Normal file
View File

@@ -0,0 +1,105 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use rand::Rng;
use serde::Serialize;
use snafu::ResultExt;
use tests_fuzz::context::TableContextRef;
use tests_fuzz::fake::WordGenerator;
use tests_fuzz::generator::alter_expr::{
AlterExprAddColumnGeneratorBuilder, AlterExprDropColumnGeneratorBuilder,
AlterExprRenameGeneratorBuilder,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{droppable_columns, AlterTableExpr, CreateTableExpr};
use tinytemplate::TinyTemplate;
use tokio::fs::OpenOptions;
use crate::error::{self, Result};
/// Creates an file
pub(crate) async fn path_to_stdio(path: &str) -> Result<std::fs::File> {
Ok(OpenOptions::new()
.append(true)
.create(true)
.read(true)
.write(true)
.open(path)
.await
.context(error::CreateFileSnafu { path })?
.into_std()
.await)
}
/// Get the path of config dir `tests/conf`.
pub(crate) fn get_conf_path() -> PathBuf {
let mut root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
root_path.push("conf");
root_path
}
/// Returns rendered config file.
pub(crate) fn render_config_file<C: Serialize>(template_path: &str, context: &C) -> String {
let mut tt = TinyTemplate::new();
let template = std::fs::read_to_string(template_path).unwrap();
tt.add_template(template_path, &template).unwrap();
tt.render(template_path, context).unwrap()
}
pub(crate) fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> CreateTableExpr {
let columns = rng.gen_range(2..30);
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(WordGenerator))
.columns(columns)
.engine("mito")
.build()
.unwrap();
create_table_generator.generate(rng).unwrap()
}
#[allow(dead_code)]
pub fn generate_alter_table_expr<R: Rng + 'static>(
table_ctx: TableContextRef,
rng: &mut R,
) -> AlterTableExpr {
let rename = rng.gen_bool(0.2);
if rename {
let expr_generator = AlterExprRenameGeneratorBuilder::default()
.table_ctx(table_ctx)
.name_generator(Box::new(WordGenerator))
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
} else {
let drop_column = rng.gen_bool(0.5) && !droppable_columns(&table_ctx.columns).is_empty();
if drop_column {
let expr_generator = AlterExprDropColumnGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
} else {
let location = rng.gen_bool(0.5);
let expr_generator = AlterExprAddColumnGeneratorBuilder::default()
.table_ctx(table_ctx)
.location(location)
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
}
}
}

View File

@@ -27,8 +27,7 @@ use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Generat
use crate::ir::alter_expr::{AlterTableExpr, AlterTableOperation};
use crate::ir::create_expr::ColumnOption;
use crate::ir::{
droppable_columns, generate_columns, generate_random_value, generate_random_value_abs,
ColumnTypeGenerator, Ident,
droppable_columns, generate_columns, generate_random_value_abs, ColumnTypeGenerator, Ident,
};
fn add_column_options_generator<R: Rng>(

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[allow(dead_code)]
pub mod column;

View File

@@ -217,13 +217,13 @@ where
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
// let sql = format!("DESC TABLE {table_name}");
// let rows = sqlx::query(&sql)
// .fetch_all(e)
// .await
// .context(error::ExecuteQuerySnafu { sql })?;
todo!()
let sql = "SELECT table_schema, table_name, column_name, greptime_data_type as data_type, semantic_type, column_default, is_nullable FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
sqlx::query_as::<_, ColumnEntry>(sql)
.bind(schema_name.value.to_string())
.bind(table_name.value.to_string())
.fetch_all(e)
.await
.context(error::ExecuteQuerySnafu { sql })
}
pub async fn fetch_columns_via_mysql(

View File

@@ -25,10 +25,7 @@ use snafu::{ensure, ResultExt};
use sqlx::{Executor, MySql, Pool};
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::fake::WordGenerator;
use tests_fuzz::generator::alter_expr::{
AlterExprAddColumnGeneratorBuilder, AlterExprDropColumnGeneratorBuilder,
AlterExprRenameGeneratorBuilder,