Compare commits

...

9 Commits

Author SHA1 Message Date
luofucong
ee67ce10c9 insert some rows and query them across DDL to test the integrity of data 2024-05-14 20:41:47 +08:00
WenyXu
2ba721cc82 fix: fix bug 2024-05-13 08:25:18 +00:00
WenyXu
de468ee595 fix: test program 2024-05-13 07:46:30 +00:00
WenyXu
bb9bdf74ec feat: export metric endpoint 2024-05-13 02:44:34 +00:00
WenyXu
be5574fdb3 fuzz alter 2024-04-19 14:14:34 +00:00
WenyXu
f9afc5dbbf feat: start_database auto retry 2024-04-19 02:40:15 +00:00
WenyXu
c7400a4182 chore: reduce sleep time 2024-04-18 13:59:20 +00:00
WenyXu
bf07dd275a test: reproduce bugs 2024-04-18 13:48:11 +00:00
WenyXu
7e1eed4b18 feat: adapt for cuckoo 2024-04-16 14:52:06 +00:00
20 changed files with 1556 additions and 117 deletions

159
Cargo.lock generated
View File

@@ -1074,6 +1074,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bufstream"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8"
[[package]]
name = "build-data"
version = "0.1.5"
@@ -2957,6 +2963,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "derive_utils"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61bb5a1014ce6dfc2a378578509abe775a5aa06bff584a547555d9efdb81b926"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.43",
]
[[package]]
name = "diff"
version = "0.1.13"
@@ -3425,6 +3442,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@@ -4414,6 +4446,15 @@ version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8573b2b1fb643a372c73b23f4da5f888677feef3305146d68a539250a9bccc7"
[[package]]
name = "io-enum"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53b53d712d99a73eec59ee5e4fe6057f8052142d38eeafbbffcb06b36d738a6e"
dependencies = [
"derive_utils",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
@@ -5427,6 +5468,32 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b"
[[package]]
name = "mysql"
version = "25.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4cc09a8118051e4617886c9c6e693c61444c2eeb5f9a792dc5d631501706565"
dependencies = [
"bufstream",
"bytes",
"crossbeam",
"flate2",
"io-enum",
"libc",
"lru",
"mysql_common 0.32.0",
"named_pipe",
"native-tls",
"once_cell",
"pem",
"percent-encoding",
"serde",
"serde_json",
"socket2 0.5.5",
"twox-hash",
"url",
]
[[package]]
name = "mysql-common-derive"
version = "0.30.2"
@@ -5608,6 +5675,33 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "named_pipe"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad9c443cce91fc3e12f017290db75dde490d685cdaaf508d7159d7cf41f0eb2b"
dependencies = [
"winapi",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "ndk-context"
version = "0.1.1"
@@ -6012,12 +6106,50 @@ dependencies = [
"tokio-rustls 0.25.0",
]
[[package]]
name = "openssl"
version = "0.10.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f"
dependencies = [
"bitflags 2.4.1",
"cfg-if 1.0.0",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.43",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.21.0"
@@ -10061,6 +10193,32 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-chaos"
version = "0.7.2"
dependencies = [
"axum",
"axum-macros",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"lazy_static",
"mysql",
"nix 0.26.4",
"prometheus",
"rand",
"rand_chacha",
"reqwest",
"serde",
"serde_json",
"snafu",
"sqlx",
"tests-fuzz",
"tinytemplate",
"tokio",
]
[[package]]
name = "tests-fuzz"
version = "0.7.2"
@@ -10078,6 +10236,7 @@ dependencies = [
"dotenv",
"lazy_static",
"libfuzzer-sys",
"mysql",
"partition",
"rand",
"rand_chacha",

View File

@@ -55,6 +55,7 @@ members = [
"src/store-api",
"src/table",
"src/index",
"tests-chaos",
"tests-fuzz",
"tests-integration",
"tests/runner",
@@ -212,6 +213,7 @@ sql = { path = "src/sql" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
tests-fuzz = { path = "tests-fuzz" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"

32
tests-chaos/Cargo.toml Normal file
View File

@@ -0,0 +1,32 @@
[package]
name = "tests-chaos"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
axum-macros = "0.3.8"
axum.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time = { workspace = true }
lazy_static.workspace = true
mysql = "25.0"
nix = { version = "0.26", features = ["process"] }
prometheus.workspace = true
rand = { workspace = true }
rand_chacha = "0.3.1"
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
sqlx = { version = "0.6", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
tests-fuzz.workspace = true
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -0,0 +1,82 @@
mode = "standalone"
enable_memory_catalog = false
skip_write = false
[http_options]
addr = "127.0.0.1:4000"
timeout = "30s"
body_limit = "64MB"
[grpc_options]
addr = "127.0.0.1:4001"
runtime_size = 8
[mysql_options]
addr = "127.0.0.1:4002"
runtime_size = 2
[mysql_options.tls]
mode = "disable"
cert_path = ""
key_path = ""
[postgres_options]
addr = "127.0.0.1:4003"
runtime_size = 2
[postgres_options.tls]
mode = "disable"
cert_path = ""
key_path = ""
[opentsdb_options]
addr = "127.0.0.1:4242"
runtime_size = 2
[influxdb_options]
enable = true
[prometheus_options]
enable = true
[prom_options]
addr = "127.0.0.1:4004"
[wal]
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[storage]
type = "File"
data_home = '{data_home}'
global_ttl = "15m"
skip_wal = true
sst_compression = "lz4raw"
memtable_type = "time_series"
[storage.compaction]
max_inflight_tasks = 4
max_files_in_level0 = 5
max_purge_tasks = 4
purge_expired_only = true
[storage.manifest]
checkpoint_margin = 128
gc_duration = '10m'
checkpoint_on_startup = false
[storage.flush]
max_flush_tasks = 2
region_write_buffer_size = "1MB"
picker_schedule_interval = "5m"
global_write_buffer_size = "150MB"
[procedure]
max_retry_times = 3
retry_delay = "500ms"
[logging]
enable_logcat = false

15
tests-chaos/src/bare.rs Normal file
View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod process;

View File

@@ -0,0 +1,132 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::process::{ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use common_telemetry::{info, warn};
use nix::sys::signal::Signal;
use snafu::ResultExt;
use tokio::process::Child;
use crate::error::{self, Result};
pub(crate) type Pid = u32;
/// The state of a process.
#[derive(Debug, Clone)]
pub(crate) struct Process {
pub(crate) exit_status: Option<ExitStatus>,
pub(crate) exited: bool,
}
/// ProcessManager provides the ability to spawn/wait/kill a child process.
#[derive(Debug, Clone)]
pub(crate) struct ProcessManager {
processes: Arc<Mutex<HashMap<Pid, Process>>>,
}
/// The callback while the child process exits.
pub type OnChildExitResult = std::result::Result<ExitStatus, std::io::Error>;
impl ProcessManager {
pub fn new() -> Self {
Self {
processes: Arc::new(Default::default()),
}
}
pub(crate) fn get(&self, pid: Pid) -> Option<Process> {
self.processes.lock().unwrap().get(&pid).cloned()
}
fn wait<F>(&self, mut child: Child, f: F)
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
let processes = self.processes.clone();
tokio::spawn(async move {
// Safety: caller checked
let pid = child.id().unwrap();
let result = child.wait().await;
match result {
Ok(code) => {
warn!("pid: {pid} exited with status: {}", code);
f(pid, Ok(code));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exit_status = Some(code);
process.exited = true;
});
}
Err(err) => {
warn!("pid: {pid} exited with error: {}", err);
f(pid, Err(err));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exited = true;
});
}
}
});
}
/// Spawns a new process.
pub fn spawn<T: Into<Stdio>, F>(
&self,
binary: &str,
args: &[String],
stdout: T,
stderr: T,
on_exit: F,
) -> Result<Pid>
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
info!("starting {} with {:?}", binary, args);
let child = tokio::process::Command::new(binary)
.args(args)
.stdout(stdout)
.stderr(stderr)
.spawn()
.context(error::SpawnChildSnafu)?;
let pid = child.id();
if let Some(pid) = pid {
self.processes.lock().unwrap().insert(
pid,
Process {
exit_status: None,
exited: false,
},
);
self.wait(child, on_exit);
Ok(pid)
} else {
error::UnexpectedExitedSnafu {}.fail()
}
}
/// Kills a process via [Pid].
pub fn kill<T: Into<Option<Signal>>>(pid: Pid, signal: T) -> Result<()> {
let signal: Option<Signal> = signal.into();
info!("kill pid :{} siganl: {:?}", pid, signal);
// Safety: checked.
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), signal)
.context(error::KillProcessSnafu)?;
Ok(())
}
}

77
tests-chaos/src/error.rs Normal file
View File

@@ -0,0 +1,77 @@
use common_macro::stack_trace_debug;
use snafu::{location, Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to spawn a child process"))]
SpawnChild {
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Child process exited unexpected"))]
UnexpectedExited { location: Location },
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { err_msg: String, location: Location },
#[snafu(display("Failed to kill a process"))]
KillProcess {
location: Location,
#[snafu(source)]
error: nix::Error,
},
#[snafu(display("Failed to create a file: {}", path))]
CreateFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to create dir all"))]
CreateDirAll {
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to write a file: {}", path))]
WriteFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to execute query: {}", sql))]
ExecuteQuery {
sql: String,
#[snafu(source)]
error: sqlx::error::Error,
location: Location,
},
#[snafu(display("Failed to request mysql, error: {}", err_msg))]
RequestMysql {
err_msg: String,
#[snafu(source)]
error: mysql::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<tests_fuzz::error::Error> for Error {
fn from(e: tests_fuzz::error::Error) -> Self {
Self::Unexpected {
err_msg: e.to_string(),
location: location!(),
}
}
}

505
tests-chaos/src/main.rs Normal file
View File

@@ -0,0 +1,505 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Query, State};
use axum::Router;
use bare::process::{Pid, ProcessManager};
use common_telemetry::{info, warn};
use mysql::prelude::Queryable;
use nix::sys::signal::Signal;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};
use tests_fuzz::context::{Rows, TableContext};
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::select_expr::{Direction, SelectExpr};
use tests_fuzz::ir::AlterTableOperation;
use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::mysql::select_expr::SelectExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::validator;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::generate_alter_table_expr;
mod bare;
mod error;
mod utils;
use axum::routing::get;
use prometheus::{register_int_counter, Encoder, IntCounter, TextEncoder};
use crate::error::{Error, RequestMysqlSnafu, Result};
use crate::utils::{generate_create_table_expr, get_conf_path, path_to_stdio, render_config_file};
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
#[derive(Copy, Clone)]
pub struct MetricsHandler;
impl MetricsHandler {
pub fn render(&self) -> String {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
// Gather the metrics.
let metric_families = prometheus::gather();
// Encode them to send.
match encoder.encode(&metric_families, &mut buffer) {
Ok(_) => match String::from_utf8(buffer) {
Ok(s) => s,
Err(e) => e.to_string(),
},
Err(e) => e.to_string(),
}
}
}
#[axum_macros::debug_handler]
pub async fn metrics(
State(state): State<MetricsHandler>,
Query(_params): Query<HashMap<String, String>>,
) -> String {
state.render()
}
lazy_static::lazy_static! {
static ref UP_COUNTER: IntCounter = register_int_counter!("up", "up counter").unwrap();
}
// cargo run --package tests-chaos --bin tests-chaos
#[tokio::main]
async fn main() {
common_telemetry::init_default_ut_logging();
tokio::spawn(async move {
let app = Router::new()
.route("/metric", get(metrics))
.with_state(MetricsHandler);
let addr = SocketAddr::from(([0, 0, 0, 0], 30000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
});
let test_dir = "/home/lfc/test-cuckoo/";
// Remove everything in the test directory, to make sure we have a clean start.
match std::fs::remove_dir_all(test_dir) {
Err(e) if e.kind() != std::io::ErrorKind::NotFound => panic!("{e:?}"),
_ => {}
}
std::fs::create_dir_all(test_dir).unwrap();
let state = Arc::new(TestState {
killed: AtomicBool::new(false),
});
let moved_state = state.clone();
tokio::spawn(async move {
let mut rng = ChaChaRng::seed_from_u64(0);
loop {
warn!("Staring");
UP_COUNTER.inc();
let pid = start_database(test_dir).await.unwrap();
let secs = rng.gen_range(100..300);
moved_state.killed.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(secs)).await;
warn!("After {secs}ms, Killing pid: {pid}");
moved_state.killed.store(true, Ordering::Relaxed);
// Flush the database before restarting it. Because cuckoo does not enable WAL,
// data may not survive the restart if not flush them.
flush_db().await;
ProcessManager::kill(pid, Signal::SIGKILL).expect("Failed to kill");
}
});
let mut rng = ChaChaRng::seed_from_u64(0);
let mut sqlx = sqlx_connections().await;
let mut mysql = mysql_connections().await;
let mut created_table = HashSet::new();
// Runs maximum 10000 times.
for _i in 0..10000 {
if let Err(e) = run_test(&sqlx, &mysql, &mut created_table, &state, &mut rng).await {
if matches!(e, Error::ExecuteQuery { .. } | Error::RequestMysql { .. })
&& state.killed.load(Ordering::Relaxed)
{
// If the query error is caused by restarting the database
// (which is an intended action), reconnect.
sqlx = sqlx_connections().await;
mysql = mysql_connections().await;
} else {
panic!("{e:?}");
}
}
}
info!("Successfully runs DDL chaos testing for cuckoo!");
}
async fn flush_db() {
info!("Start flushing the database ...");
let _ = reqwest::get("http://127.0.0.1:4000/v1/admin/flush?db=public")
.await
.unwrap()
.bytes()
.await
.unwrap();
}
async fn mysql_connections() -> mysql::Pool {
let mut max_retry = 10;
loop {
match mysql::Pool::new("mysql://127.0.0.1:4002/public") {
Ok(x) => return x,
Err(e) => {
max_retry -= 1;
if max_retry == 0 {
panic!("{e:?}")
} else {
info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
}
async fn sqlx_connections() -> Pool<MySql> {
let mut max_retry = 10;
loop {
match MySqlPoolOptions::new()
.connect("mysql://127.0.0.1:4002/public")
.await
{
Ok(x) => return x,
Err(e) => {
max_retry -= 1;
if max_retry == 0 {
panic!("{e:?}")
} else {
info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
}
struct TestState {
killed: AtomicBool,
}
async fn run_test<R: Rng + 'static>(
client: &Pool<MySql>,
pool: &mysql::Pool,
created_table: &mut HashSet<String>,
state: &Arc<TestState>,
rng: &mut R,
) -> Result<()> {
let expr = generate_create_table_expr(rng);
let table_name = expr.table_name.to_string();
if !created_table.insert(table_name.clone()) {
warn!("ignores same name table: {table_name}");
// ignores.
return Ok(());
}
let mut table_ctx = TableContext::from(&expr);
let translator = CreateTableExprTranslator;
let sql = translator.translate(&expr).unwrap();
let result = sqlx::query(&sql).execute(client).await;
match result {
Ok(result) => {
validate_mysql(client, state, &table_ctx).await;
info!("Create table: {sql}, result: {result:?}");
}
Err(err) => {
ensure!(
state.killed.load(Ordering::Relaxed),
error::UnexpectedSnafu {
err_msg: err.to_string(),
}
);
created_table.insert(table_name);
return Ok(());
}
}
let actions = rng.gen_range(1..20);
for _ in 0..actions {
let expr = generate_alter_table_expr(Arc::new(table_ctx.clone()), rng);
if let AlterTableOperation::RenameTable { new_table_name } = &expr.alter_options {
let table_name = new_table_name.to_string();
if created_table.contains(&table_name) {
warn!("ignores altering to same name table: {table_name}");
continue;
}
};
insert_rows(pool, &mut table_ctx, rng)?;
let translator = AlterTableExprTranslator;
let sql = translator.translate(&expr).unwrap();
let result = sqlx::query(&sql).execute(client).await;
match result {
Ok(result) => {
info!("alter table: {sql}, result: {result:?}");
let table_name = table_ctx.name.to_string();
created_table.remove(&table_name);
table_ctx.alter(expr)?;
validate_mysql(client, state, &table_ctx).await;
let table_name = table_ctx.name.to_string();
created_table.insert(table_name);
}
Err(err) => {
table_ctx.alter(expr)?;
let table_name = table_ctx.name.to_string();
created_table.insert(table_name);
ensure!(
state.killed.load(Ordering::Relaxed),
error::UnexpectedSnafu {
err_msg: err.to_string(),
}
);
break;
}
}
let actual_rows = fetch_all_rows(&table_ctx, pool)?;
info!("fetch all rows after alter: {actual_rows}");
if actual_rows.empty() && state.killed.load(Ordering::Relaxed) {
// Cuckoo does not have WAL enabled; therefore the data could be cleared across restart.
// When that happened, clear the saved data that are used for comparison, too.
table_ctx.clear_data();
} else {
assert_eq!(
&table_ctx.rows, &actual_rows,
r#"rows not equal:
expect: {}
actual: {}"#,
&table_ctx.rows, &actual_rows
)
}
}
Ok(())
}
fn insert_rows<R: Rng + 'static>(
pool: &mysql::Pool,
table_ctx: &mut TableContext,
rng: &mut R,
) -> Result<()> {
let insert_expr = InsertExprGeneratorBuilder::default()
.table_ctx(Arc::new(table_ctx.clone()))
.build()
.unwrap()
.generate(rng)
.unwrap();
let sql = InsertIntoExprTranslator.translate(&insert_expr).unwrap();
info!("executing insertion: {sql}");
let mut conn = pool.get_conn().context(RequestMysqlSnafu {
err_msg: "get connection",
})?;
conn.query_drop(&sql).with_context(|_| RequestMysqlSnafu {
err_msg: format!("executing sql '{}'", sql),
})?;
if conn.affected_rows() > 0 {
table_ctx.insert(insert_expr)?;
}
Ok(())
}
// Sqlx treats all queries as prepared, we have to switch to another mysql client library here.
// There's a error when trying to query GreptimeDB with prepared statement:
// "tried to use [50, 48, ..., 56] as MYSQL_TYPE_TIMESTAMP"
// Besides, sqlx is suited for cases where table schema is known (and representable in codes),
// definitely not here.
fn fetch_all_rows(table_ctx: &TableContext, pool: &mysql::Pool) -> Result<Rows> {
let select_expr = SelectExpr {
table_name: table_ctx.name.to_string(),
columns: table_ctx.columns.clone(),
order_by: vec![table_ctx
.columns
.iter()
.find_map(|c| {
if c.is_time_index() {
Some(c.name.to_string())
} else {
None
}
})
.unwrap()],
direction: Direction::Asc,
limit: usize::MAX,
};
let sql = SelectExprTranslator.translate(&select_expr).unwrap();
info!("executing selection: {sql}");
let mut conn = pool.get_conn().context(RequestMysqlSnafu {
err_msg: "get connection",
})?;
let rows: Vec<mysql::Row> = conn.query(&sql).with_context(|_| RequestMysqlSnafu {
err_msg: format!("executing sql: {}", sql),
})?;
Ok(Rows::fill(rows))
}
async fn validate_mysql(client: &Pool<MySql>, _state: &Arc<TestState>, table_ctx: &TableContext) {
loop {
match validator::column::fetch_columns_via_mysql(
client,
"public".into(),
table_ctx.name.clone(),
)
.await
{
Ok(mut column_entries) => {
column_entries.sort_by(|a, b| a.column_name.cmp(&b.column_name));
let mut columns = table_ctx.columns.clone();
columns.sort_by(|a, b| a.name.value.cmp(&b.name.value));
validator::column::assert_eq(&column_entries, &columns).unwrap();
return;
}
Err(err) => warn!(
"Failed to fetch table '{}' columns, error: {}",
table_ctx.name, err
),
}
}
}
async fn start_database(test_dir: &str) -> Result<Pid> {
let binary_path = "/home/lfc/greptimedb-cuckoo/target/debug/greptime";
let template_filename = "standalone-v0.3.2.toml.template";
let health_url = "http://127.0.0.1:4000/health";
let process_manager = ProcessManager::new();
for _ in 0..3 {
let pid = start_process(&process_manager, binary_path, test_dir, template_filename)
.await
.unwrap();
match tokio::time::timeout(Duration::from_secs(10), health_check(health_url)).await {
Ok(_) => {
info!("GreptimeDB started, pid: {pid}");
return Ok(pid);
}
Err(_) => {
ensure!(
process_manager.get(pid).unwrap().exited,
error::UnexpectedSnafu {
err_msg: format!("Failed to start database: pid: {pid}")
}
);
// retry alter
warn!("Wait for staring timeout, retry later...");
}
};
}
error::UnexpectedSnafu {
err_msg: "Failed to start datanode",
}
.fail()
}
async fn start_process(
process_manager: &ProcessManager,
binary: &str,
test_dir: &str,
template_filename: &str,
) -> Result<Pid> {
tokio::fs::create_dir_all(test_dir)
.await
.context(error::CreateDirAllSnafu)?;
let data_home = format!("{test_dir}data_home");
info!("data home: {}", data_home);
// Prepares the config file
let mut conf_path = get_conf_path();
conf_path.push(template_filename);
let template_path = conf_path.to_str().unwrap().to_string();
let conf_path = format!("{test_dir}config.toml");
info!("conf path: {}", conf_path);
#[derive(Serialize)]
struct Context {
data_home: String,
}
let conf_content = render_config_file(&template_path, &Context { data_home });
let mut config_file = File::create(&conf_path)
.await
.context(error::CreateFileSnafu { path: &conf_path })?;
config_file
.write_all(conf_content.as_bytes())
.await
.context(error::WriteFileSnafu { path: &conf_path })?;
let args = vec![
DEFAULT_LOG_LEVEL.to_string(),
"standalone".to_string(),
"start".to_string(),
format!("--config-file={conf_path}"),
];
let now = common_time::util::current_time_millis();
let stdout = format!("{test_dir}stdout-{}", now);
let stderr = format!("{test_dir}stderr-{}", now);
info!("stdout: {}, stderr: {}", stdout, stderr);
let stdout = path_to_stdio(&stdout).await?;
let stderr = path_to_stdio(&stderr).await?;
let on_exit = move |pid, result| {
info!("The pid: {pid} exited, result: {result:?}");
};
process_manager.spawn(binary, &args, stdout, stderr, on_exit)
}
async fn health_check(url: &str) {
loop {
match reqwest::get(url).await {
Ok(resp) => {
if resp.status() == 200 {
info!("health checked!");
return;
}
info!("failed to health, status: {}", resp.status());
}
Err(err) => {
info!("failed to health, err: {err:?}");
}
}
info!("checking health later...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

111
tests-chaos/src/utils.rs Normal file
View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use rand::Rng;
use serde::Serialize;
use snafu::ResultExt;
use tests_fuzz::context::TableContextRef;
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::generator::alter_expr::{
AlterExprAddColumnGeneratorBuilder, AlterExprDropColumnGeneratorBuilder,
AlterExprRenameGeneratorBuilder,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{droppable_columns, AlterTableExpr, CreateTableExpr};
use tinytemplate::TinyTemplate;
use tokio::fs::OpenOptions;
use crate::error::{self, Result};
/// Creates an file
pub(crate) async fn path_to_stdio(path: &str) -> Result<std::fs::File> {
Ok(OpenOptions::new()
.append(true)
.create(true)
.read(true)
.write(true)
.open(path)
.await
.context(error::CreateFileSnafu { path })?
.into_std()
.await)
}
/// Get the path of config dir `tests/conf`.
pub(crate) fn get_conf_path() -> PathBuf {
let mut root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
root_path.push("conf");
root_path
}
/// Returns rendered config file.
pub(crate) fn render_config_file<C: Serialize>(template_path: &str, context: &C) -> String {
let mut tt = TinyTemplate::new();
let template = std::fs::read_to_string(template_path).unwrap();
tt.add_template(template_path, &template).unwrap();
tt.render(template_path, context).unwrap()
}
pub(crate) fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> CreateTableExpr {
let columns = rng.gen_range(2..30);
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.columns(columns)
.engine("mito")
.build()
.unwrap();
create_table_generator.generate(rng).unwrap()
}
#[allow(dead_code)]
pub fn generate_alter_table_expr<R: Rng + 'static>(
table_ctx: TableContextRef,
rng: &mut R,
) -> AlterTableExpr {
let rename = rng.gen_bool(0.2);
if rename {
let expr_generator = AlterExprRenameGeneratorBuilder::default()
.table_ctx(table_ctx)
.name_generator(Box::new(WordGenerator))
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
} else {
let drop_column = rng.gen_bool(0.5) && !droppable_columns(&table_ctx.columns).is_empty();
if drop_column {
let expr_generator = AlterExprDropColumnGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
} else {
let location = rng.gen_bool(0.5);
let expr_generator = AlterExprAddColumnGeneratorBuilder::default()
.table_ctx(table_ctx)
.location(location)
.build()
.unwrap();
expr_generator.generate(rng).unwrap()
}
}
}

View File

@@ -24,6 +24,7 @@ derive_builder = { workspace = true }
dotenv = "0.15"
lazy_static = { workspace = true }
libfuzzer-sys = "0.4"
mysql = "25.0"
partition = { workspace = true }
rand = { workspace = true }
rand_chacha = "0.3.1"

View File

@@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use std::vec;
use common_query::AddColumnLocation;
use common_telemetry::info;
use common_time::timezone::parse_timezone;
use datatypes::value::Value;
use partition::partition::PartitionDef;
use rand::Rng;
use snafu::{ensure, OptionExt};
@@ -22,15 +27,171 @@ use snafu::{ensure, OptionExt};
use crate::error::{self, Result};
use crate::generator::Random;
use crate::ir::alter_expr::AlterTableOperation;
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
use crate::ir::create_expr::ColumnOption;
use crate::ir::insert_expr::RowValue;
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident, InsertIntoExpr};
pub type TableContextRef = Arc<TableContext>;
#[derive(Debug, Clone, PartialEq, PartialOrd)]
struct Cell {
value: mysql::Value,
}
impl Cell {
fn null() -> Self {
Self {
value: mysql::Value::NULL,
}
}
}
impl Display for Cell {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value.as_sql(true))
}
}
impl From<Value> for Cell {
fn from(value: Value) -> Self {
if value.is_null() {
Self::null()
} else {
let s = match value {
Value::Timestamp(t) => {
t.to_timezone_aware_string(Some(&parse_timezone(Some("+08:00"))))
}
Value::Boolean(b) => if b { "1" } else { "0" }.to_string(),
_ => value.to_string(),
};
Self {
value: mysql::Value::Bytes(s.into_bytes()),
}
}
}
}
impl From<mysql::Value> for Cell {
fn from(value: mysql::Value) -> Self {
Self { value }
}
}
#[derive(Debug, Clone, PartialEq)]
struct Row {
cells: Vec<Cell>,
}
impl Row {
fn with(columns_len: usize) -> Self {
Self {
cells: vec![Cell::null(); columns_len],
}
}
fn set_cell(&mut self, at: usize, cell: Cell) {
self.cells[at] = cell;
}
fn add_column(&mut self, at: usize, cell: Cell) {
self.cells.insert(at, cell);
}
fn drop_column(&mut self, at: usize) {
self.cells.remove(at);
}
}
impl Display for Row {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (i, v) in self.cells.iter().enumerate() {
write!(f, "{}\t", v)?;
if i + 1 < self.cells.len() {
write!(f, "\t")?;
}
}
write!(f, "]")
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Rows {
rows: Vec<Row>,
}
impl Rows {
pub fn empty(&self) -> bool {
self.rows.is_empty()
}
pub fn new() -> Self {
Self { rows: vec![] }
}
pub fn fill(rows: Vec<mysql::Row>) -> Self {
Self {
rows: rows
.into_iter()
.map(|r| Row {
cells: r.unwrap().into_iter().map(Into::into).collect(),
})
.collect(),
}
}
fn with(rows: usize) -> Self {
Self {
rows: Vec::with_capacity(rows),
}
}
fn add_column(&mut self, at: usize, column: &Column) {
let cell = column
.options
.iter()
.find_map(|x| {
if let ColumnOption::DefaultValue(v) = x {
Some(v.clone().into())
} else {
None
}
})
.unwrap_or(Cell::null());
self.rows
.iter_mut()
.for_each(|x| x.add_column(at, cell.clone()))
}
fn drop_column(&mut self, at: usize) {
self.rows.iter_mut().for_each(|x| x.drop_column(at))
}
fn add_row(&mut self, row: Row) {
self.rows.push(row)
}
fn extend(&mut self, rows: Rows) {
self.rows.extend(rows.rows)
}
}
impl Display for Rows {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Rows")?;
for r in &self.rows {
writeln!(f, "{}", r)?;
}
Ok(())
}
}
/// TableContext stores table info.
#[derive(Debug, Clone)]
pub struct TableContext {
pub name: Ident,
pub columns: Vec<Column>,
pub rows: Rows,
// GreptimeDB specific options
pub partition: Option<PartitionDef>,
@@ -50,6 +211,7 @@ impl From<&CreateTableExpr> for TableContext {
Self {
name: name.clone(),
columns: columns.clone(),
rows: Rows::new(),
partition: partition.clone(),
primary_keys: primary_keys.clone(),
}
@@ -57,8 +219,72 @@ impl From<&CreateTableExpr> for TableContext {
}
impl TableContext {
pub fn clear_data(&mut self) {
self.rows = Rows::new();
}
pub fn insert(&mut self, expr: InsertIntoExpr) -> Result<()> {
fn find_default_value(column: &Column) -> Cell {
column
.options
.iter()
.find_map(|opt| {
if let ColumnOption::DefaultValue(v) = opt {
Some(v.clone().into())
} else {
None
}
})
.unwrap_or(Cell::null())
}
let mut rows = Rows::with(expr.values_list.len());
for insert_values in expr.values_list.into_iter() {
let mut row = Row::with(self.columns.len());
for (i, column) in self.columns.iter().enumerate() {
let cell = if let Some(v) = expr
.columns
.iter()
.zip(insert_values.iter())
.find_map(|(x, y)| if x.name == column.name { Some(y) } else { None })
{
match v {
RowValue::Value(v) => v.clone().into(),
RowValue::Default => find_default_value(column),
}
} else {
find_default_value(column)
};
row.set_cell(i, cell);
}
rows.add_row(row);
}
self.rows.extend(rows);
let time_index = self.columns.iter().position(|x| x.is_time_index()).unwrap();
self.rows.rows.sort_by(|x, y| {
x.cells[time_index]
.partial_cmp(&y.cells[time_index])
.unwrap_or(std::cmp::Ordering::Equal)
});
info!("after insertion, expected rows: {}", self.rows);
Ok(())
}
fn find_column_index(&self, column_name: &str) -> Result<usize> {
self.columns
.iter()
.position(|col| col.name.to_string() == column_name)
.context(error::UnexpectedSnafu {
violated: format!("Column: {column_name} not found"),
})
}
/// Applies the [AlterTableExpr].
pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
pub fn alter(&mut self, expr: AlterTableExpr) -> Result<()> {
match expr.alter_options {
AlterTableOperation::AddColumn { column, location } => {
ensure!(
@@ -69,23 +295,18 @@ impl TableContext {
);
match location {
Some(AddColumnLocation::First) => {
let mut columns = Vec::with_capacity(self.columns.len() + 1);
columns.push(column);
columns.extend(self.columns);
self.columns = columns;
self.rows.add_column(0, &column);
self.columns.insert(0, column);
}
Some(AddColumnLocation::After { column_name }) => {
let index = self
.columns
.iter()
// TODO(weny): find a better way?
.position(|col| col.name.to_string() == column_name)
.context(error::UnexpectedSnafu {
violated: format!("Column: {column_name} not found"),
})?;
let index = self.find_column_index(&column_name)?;
self.rows.add_column(index + 1, &column);
self.columns.insert(index + 1, column);
}
None => self.columns.push(column),
None => {
self.rows.add_column(self.columns.len(), &column);
self.columns.push(column);
}
}
// Re-generates the primary_keys
self.primary_keys = self
@@ -100,10 +321,12 @@ impl TableContext {
}
})
.collect();
Ok(self)
}
AlterTableOperation::DropColumn { name } => {
self.columns.retain(|col| col.name != name);
let at = self.find_column_index(&name.to_string())?;
self.columns.remove(at);
self.rows.drop_column(at);
// Re-generates the primary_keys
self.primary_keys = self
.columns
@@ -117,7 +340,6 @@ impl TableContext {
}
})
.collect();
Ok(self)
}
AlterTableOperation::RenameTable { new_table_name } => {
ensure!(
@@ -127,9 +349,9 @@ impl TableContext {
}
);
self.name = new_table_name;
Ok(self)
}
}
Ok(())
}
pub fn generate_unique_column_name<R: Rng>(
@@ -162,16 +384,17 @@ mod tests {
use common_query::AddColumnLocation;
use datatypes::data_type::ConcreteDataType;
use super::TableContext;
use super::*;
use crate::ir::alter_expr::AlterTableOperation;
use crate::ir::create_expr::ColumnOption;
use crate::ir::{AlterTableExpr, Column, Ident};
#[test]
fn test_table_context_alter() {
let table_ctx = TableContext {
let mut table_ctx = TableContext {
name: "foo".into(),
columns: vec![],
rows: Rows::new(),
partition: None,
primary_keys: vec![],
};
@@ -187,7 +410,7 @@ mod tests {
location: None,
},
};
let table_ctx = table_ctx.alter(expr).unwrap();
table_ctx.alter(expr).unwrap();
assert_eq!(table_ctx.columns[0].name, Ident::new("a"));
assert_eq!(table_ctx.primary_keys, vec![0]);
@@ -203,7 +426,7 @@ mod tests {
location: Some(AddColumnLocation::First),
},
};
let table_ctx = table_ctx.alter(expr).unwrap();
table_ctx.alter(expr).unwrap();
assert_eq!(table_ctx.columns[0].name, Ident::new("b"));
assert_eq!(table_ctx.primary_keys, vec![0, 1]);
@@ -221,7 +444,7 @@ mod tests {
}),
},
};
let table_ctx = table_ctx.alter(expr).unwrap();
table_ctx.alter(expr).unwrap();
assert_eq!(table_ctx.columns[1].name, Ident::new("c"));
assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]);
@@ -230,7 +453,7 @@ mod tests {
table_name: "foo".into(),
alter_options: AlterTableOperation::DropColumn { name: "b".into() },
};
let table_ctx = table_ctx.alter(expr).unwrap();
table_ctx.alter(expr).unwrap();
assert_eq!(table_ctx.columns[1].name, Ident::new("a"));
assert_eq!(table_ctx.primary_keys, vec![0, 1]);
}

View File

@@ -97,7 +97,6 @@ eum
iure
reprehenderit
qui
in
ea
voluptate
velit
@@ -138,10 +137,8 @@ unde
omnis
iste
natus
error
similique
sunt
in
culpa
qui
officia
@@ -210,7 +207,6 @@ quo
voluptas
nulla
pariatur
at
vero
eos
et

View File

@@ -27,7 +27,7 @@ use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Generat
use crate::ir::alter_expr::{AlterTableExpr, AlterTableOperation};
use crate::ir::create_expr::ColumnOption;
use crate::ir::{
droppable_columns, generate_columns, generate_random_value, ColumnTypeGenerator, Ident,
droppable_columns, generate_columns, generate_random_value_abs, ColumnTypeGenerator, Ident,
};
fn add_column_options_generator<R: Rng>(
@@ -41,7 +41,7 @@ fn add_column_options_generator<R: Rng>(
match idx {
0 => vec![ColumnOption::Null],
1 => {
vec![ColumnOption::DefaultValue(generate_random_value(
vec![ColumnOption::DefaultValue(generate_random_value_abs(
rng,
column_type,
None,
@@ -50,7 +50,7 @@ fn add_column_options_generator<R: Rng>(
2 => {
vec![
ColumnOption::PrimaryKey,
ColumnOption::DefaultValue(generate_random_value(rng, column_type, None)),
ColumnOption::DefaultValue(generate_random_value_abs(rng, column_type, None)),
]
}
_ => unreachable!(),

View File

@@ -94,11 +94,7 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
Ok(InsertIntoExpr {
table_name: self.table_ctx.name.to_string(),
columns: if omit_column_list {
vec![]
} else {
values_columns
},
columns: values_columns,
values_list,
})
}

View File

@@ -15,14 +15,15 @@
//! The intermediate representation
pub(crate) mod alter_expr;
pub(crate) mod create_expr;
pub(crate) mod insert_expr;
pub(crate) mod select_expr;
pub mod create_expr;
pub mod insert_expr;
pub mod select_expr;
use core::fmt;
pub use alter_expr::AlterTableExpr;
use common_time::{Date, DateTime, Timestamp};
pub use alter_expr::{AlterTableExpr, AlterTableOperation};
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Interval, Timestamp};
pub use create_expr::{CreateDatabaseExpr, CreateTableExpr};
use datatypes::data_type::ConcreteDataType;
use datatypes::types::TimestampType;
@@ -102,29 +103,65 @@ pub fn generate_random_value<R: Rng>(
}
}
pub fn generate_random_value_abs<R: Rng>(
rng: &mut R,
datatype: &ConcreteDataType,
random_str: Option<&dyn Random<Ident, R>>,
) -> Value {
match datatype {
&ConcreteDataType::Boolean(_) => Value::from(rng.gen::<bool>()),
ConcreteDataType::Int16(_) => Value::from(i16::abs(rng.gen::<i16>())),
ConcreteDataType::Int32(_) => Value::from(i32::abs(rng.gen::<i32>())),
ConcreteDataType::Int64(_) => Value::from(i64::abs(rng.gen::<i64>())),
ConcreteDataType::Float32(_) => Value::from(f32::abs(rng.gen::<f32>())),
ConcreteDataType::Float64(_) => Value::from(f64::abs(rng.gen::<f64>())),
ConcreteDataType::String(_) => match random_str {
Some(random) => Value::from(random.gen(rng).value),
None => Value::from(rng.gen::<char>().to_string()),
},
ConcreteDataType::Date(_) => generate_random_date(rng),
ConcreteDataType::DateTime(_) => generate_random_datetime(rng),
&ConcreteDataType::Timestamp(ts_type) => generate_random_timestamp(rng, ts_type),
_ => unimplemented!("unsupported type: {datatype}"),
}
}
fn generate_random_timestamp<R: Rng>(rng: &mut R, ts_type: TimestampType) -> Value {
let v = match ts_type {
TimestampType::Second(_) => {
let min = i64::from(Timestamp::MIN_SECOND);
let max = i64::from(Timestamp::MAX_SECOND);
let now = Timestamp::current_time(TimeUnit::Second);
let min = now.sub_interval(Interval::from_year_month(12)).unwrap();
let max = now.add_interval(Interval::from_year_month(12)).unwrap();
let min = i64::from(min);
let max = i64::from(max);
let value = rng.gen_range(min..=max);
Timestamp::new_second(value)
}
TimestampType::Millisecond(_) => {
let min = i64::from(Timestamp::MIN_MILLISECOND);
let max = i64::from(Timestamp::MAX_MILLISECOND);
let now = Timestamp::current_time(TimeUnit::Millisecond);
let min = now.sub_interval(Interval::from_year_month(12)).unwrap();
let max = now.add_interval(Interval::from_year_month(12)).unwrap();
let min = i64::from(min);
let max = i64::from(max);
let value = rng.gen_range(min..=max);
Timestamp::new_millisecond(value)
}
TimestampType::Microsecond(_) => {
let min = i64::from(Timestamp::MIN_MICROSECOND);
let max = i64::from(Timestamp::MAX_MICROSECOND);
let now = Timestamp::current_time(TimeUnit::Microsecond);
let min = now.sub_interval(Interval::from_year_month(12)).unwrap();
let max = now.add_interval(Interval::from_year_month(12)).unwrap();
let min = i64::from(min);
let max = i64::from(max);
let value = rng.gen_range(min..=max);
Timestamp::new_microsecond(value)
}
TimestampType::Nanosecond(_) => {
let min = i64::from(Timestamp::MIN_NANOSECOND);
let max = i64::from(Timestamp::MAX_NANOSECOND);
let now = Timestamp::current_time(TimeUnit::Nanosecond);
let min = now.sub_interval(Interval::from_year_month(12)).unwrap();
let max = now.add_interval(Interval::from_year_month(12)).unwrap();
let min = i64::from(min);
let max = i64::from(max);
let value = rng.gen_range(min..=max);
Timestamp::new_nanosecond(value)
}
@@ -278,7 +315,7 @@ pub fn column_options_generator<R: Rng>(
match option_idx {
0 => vec![ColumnOption::Null],
1 => vec![ColumnOption::NotNull],
2 => vec![ColumnOption::DefaultValue(generate_random_value(
2 => vec![ColumnOption::DefaultValue(generate_random_value_abs(
rng,
column_type,
None,

View File

@@ -18,6 +18,7 @@ use datatypes::value::Value;
use crate::ir::Column;
#[derive(Debug)]
pub struct InsertIntoExpr {
pub table_name: String,
pub columns: Vec<Column>,
@@ -26,6 +27,7 @@ pub struct InsertIntoExpr {
pub type RowValues = Vec<RowValue>;
#[derive(Debug)]
pub enum RowValue {
Value(Value),
Default,

View File

@@ -14,7 +14,7 @@
use datatypes::data_type::ConcreteDataType;
use crate::context::TableContext;
use crate::context::{Rows, TableContext};
use crate::ir::create_expr::ColumnOption;
use crate::ir::Column;
@@ -53,6 +53,7 @@ pub fn new_test_ctx() -> TableContext {
options: vec![ColumnOption::TimeIndex],
},
],
rows: Rows::new(),
partition: None,
primary_keys: vec![],
}

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[allow(dead_code)]
pub mod column;

View File

@@ -16,7 +16,9 @@ use common_telemetry::debug;
use datatypes::data_type::DataType;
use snafu::{ensure, ResultExt};
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Type};
use sqlx::{
ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Row, Type,
};
use crate::error::{self, Result};
use crate::ir::create_expr::ColumnOption;
@@ -24,12 +26,15 @@ use crate::ir::{Column, Ident};
#[derive(Debug, sqlx::FromRow)]
pub struct ColumnEntry {
pub table_schema: String,
pub table_name: String,
#[sqlx(rename = "Field")]
pub column_name: String,
#[sqlx(rename = "Type")]
pub data_type: String,
#[sqlx(rename = "Semantic Type")]
pub semantic_type: String,
#[sqlx(rename = "Default")]
pub column_default: Option<String>,
#[sqlx(rename = "Null")]
pub is_nullable: String,
}
@@ -45,6 +50,7 @@ enum SemanticType {
fn semantic_type(str: &str) -> Option<SemanticType> {
match str {
"TIME INDEX" => Some(SemanticType::Timestamp),
"TIMESTAMP" => Some(SemanticType::Timestamp),
"FIELD" => Some(SemanticType::Field),
"TAG" => Some(SemanticType::Tag),
@@ -127,43 +133,43 @@ impl PartialEq<Column> for ColumnEntry {
return false;
}
}
//TODO: Checks `semantic_type`
match semantic_type(&self.semantic_type) {
Some(SemanticType::Tag) => {
if !other
.options
.iter()
.any(|opt| matches!(opt, ColumnOption::PrimaryKey))
{
debug!("ColumnOption::PrimaryKey is not found");
return false;
}
}
Some(SemanticType::Field) => {
if other
.options
.iter()
.any(|opt| matches!(opt, ColumnOption::PrimaryKey | ColumnOption::TimeIndex))
{
debug!("unexpected ColumnOption::PrimaryKey or ColumnOption::TimeIndex");
return false;
}
}
Some(SemanticType::Timestamp) => {
if !other
.options
.iter()
.any(|opt| matches!(opt, ColumnOption::TimeIndex))
{
debug!("ColumnOption::TimeIndex is not found");
return false;
}
}
None => {
debug!("unknown semantic type: {}", self.semantic_type);
return false;
}
};
// //TODO: Checks `semantic_type`
// match semantic_type(&self.semantic_type) {
// Some(SemanticType::Tag) => {
// if !other
// .options
// .iter()
// .any(|opt| matches!(opt, ColumnOption::PrimaryKey))
// {
// debug!("ColumnOption::PrimaryKey is not found");
// return false;
// }
// }
// Some(SemanticType::Field) => {
// if other
// .options
// .iter()
// .any(|opt| matches!(opt, ColumnOption::PrimaryKey | ColumnOption::TimeIndex))
// {
// debug!("unexpected ColumnOption::PrimaryKey or ColumnOption::TimeIndex");
// return false;
// }
// }
// Some(SemanticType::Timestamp) => {
// if !other
// .options
// .iter()
// .any(|opt| matches!(opt, ColumnOption::TimeIndex))
// {
// debug!("ColumnOption::TimeIndex is not found");
// return false;
// }
// }
// None => {
// debug!("unknown semantic type: {}", self.semantic_type);
// return false;
// }
// };
true
}
@@ -220,6 +226,37 @@ where
.context(error::ExecuteQuerySnafu { sql })
}
pub async fn fetch_columns_via_mysql(
db: &Pool<MySql>,
_schema_name: Ident,
table_name: Ident,
) -> Result<Vec<ColumnEntry>> {
let sql = format!("DESC TABLE {table_name}");
let rows = sqlx::query(&sql)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })?;
Ok(rows
.into_iter()
.map(|row| {
let default_value: String = row.get(3);
let column_default = if default_value.is_empty() {
None
} else {
Some(default_value)
};
ColumnEntry {
column_name: row.get(0),
data_type: row.get(1),
is_nullable: row.get(2),
column_default,
semantic_type: row.get(4),
}
})
.collect::<Vec<_>>())
}
#[cfg(test)]
mod tests {
use datatypes::data_type::{ConcreteDataType, DataType};
@@ -233,8 +270,6 @@ mod tests {
fn test_column_eq() {
common_telemetry::init_default_ut_logging();
let column_entry = ColumnEntry {
table_schema: String::new(),
table_name: String::new(),
column_name: "test".to_string(),
data_type: ConcreteDataType::int8_datatype().name(),
semantic_type: "FIELD".to_string(),
@@ -257,8 +292,6 @@ mod tests {
assert!(column_entry == column);
// With default value
let column_entry = ColumnEntry {
table_schema: String::new(),
table_name: String::new(),
column_name: "test".to_string(),
data_type: ConcreteDataType::int8_datatype().to_string(),
semantic_type: "FIELD".to_string(),
@@ -273,8 +306,6 @@ mod tests {
assert!(column_entry == column);
// With default function
let column_entry = ColumnEntry {
table_schema: String::new(),
table_name: String::new(),
column_name: "test".to_string(),
data_type: ConcreteDataType::int8_datatype().to_string(),
semantic_type: "FIELD".to_string(),

View File

@@ -21,23 +21,22 @@ use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::ResultExt;
use sqlx::{MySql, Pool};
use snafu::{ensure, ResultExt};
use sqlx::{Executor, MySql, Pool};
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::fake::WordGenerator;
use tests_fuzz::generator::alter_expr::{
AlterExprAddColumnGeneratorBuilder, AlterExprDropColumnGeneratorBuilder,
AlterExprRenameGeneratorBuilder,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{droppable_columns, AlterTableExpr, CreateTableExpr};
use tests_fuzz::ir::{droppable_columns, AlterTableExpr, CreateTableExpr, InsertIntoExpr};
use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::validator;
@@ -52,19 +51,17 @@ impl FuzzContext {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
struct FuzzInput {
seed: u64,
actions: usize,
rows: usize,
}
fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> Result<CreateTableExpr> {
let columns = rng.gen_range(2..30);
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.name_generator(Box::new(WordGenerator))
.columns(columns)
.engine("mito")
.build()
@@ -80,10 +77,7 @@ fn generate_alter_table_expr<R: Rng + 'static>(
if rename {
let expr_generator = AlterExprRenameGeneratorBuilder::default()
.table_ctx(table_ctx)
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.name_generator(Box::new(WordGenerator))
.build()
.unwrap();
expr_generator.generate(rng)
@@ -112,11 +106,29 @@ impl Arbitrary<'_> for FuzzInput {
let seed = u.int_in_range(u64::MIN..=u64::MAX)?;
let mut rng = ChaChaRng::seed_from_u64(seed);
let actions = rng.gen_range(1..256);
let insertions = rng.gen_range(1..1024);
Ok(FuzzInput { seed, actions })
Ok(FuzzInput {
seed,
actions,
rows: insertions,
})
}
}
fn generate_insert_expr<R: Rng + 'static>(
input: FuzzInput,
rng: &mut R,
table_ctx: TableContextRef,
) -> Result<InsertIntoExpr> {
let insert_generator = InsertExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.rows(input.rows)
.build()
.unwrap();
insert_generator.generate(rng)
}
async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {input:?}");
let mut rng = ChaChaRng::seed_from_u64(input.seed);
@@ -143,10 +155,12 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!("Alter table: {sql}, result: {result:?}");
// Applies changes
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap());
let mut t = Arc::unwrap_or_clone(table_ctx);
t.alter(expr).unwrap();
table_ctx = Arc::new(t);
// Validates columns
let mut column_entries = validator::column::fetch_columns(
let mut column_entries = validator::column::fetch_columns_via_mysql(
&ctx.greptime,
"public".into(),
table_ctx.name.clone(),
@@ -156,6 +170,28 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut columns = table_ctx.columns.clone();
columns.sort_by(|a, b| a.name.value.cmp(&b.name.value));
validator::column::assert_eq(&column_entries, &columns)?;
// insertions
let insert_expr = generate_insert_expr(input, &mut rng, table_ctx.clone())?;
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let result = ctx
.greptime
// unprepared query, see <https://github.com/GreptimeTeam/greptimedb/issues/3500>
.execute(sql.as_str())
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
ensure!(
result.rows_affected() == input.rows as u64,
error::AssertSnafu {
reason: format!(
"expected rows affected: {}, actual: {}",
input.rows,
result.rows_affected(),
)
}
);
}
// Cleans up