mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
insert some rows and query them across DDL to test the integrity of data
This commit is contained in:
134
Cargo.lock
generated
134
Cargo.lock
generated
@@ -1074,6 +1074,12 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bufstream"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8"
|
||||
|
||||
[[package]]
|
||||
name = "build-data"
|
||||
version = "0.1.5"
|
||||
@@ -2957,6 +2963,17 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_utils"
|
||||
version = "0.14.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61bb5a1014ce6dfc2a378578509abe775a5aa06bff584a547555d9efdb81b926"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diff"
|
||||
version = "0.1.13"
|
||||
@@ -3425,6 +3442,21 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.2.1"
|
||||
@@ -4414,6 +4446,15 @@ version = "0.3.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8573b2b1fb643a372c73b23f4da5f888677feef3305146d68a539250a9bccc7"
|
||||
|
||||
[[package]]
|
||||
name = "io-enum"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "53b53d712d99a73eec59ee5e4fe6057f8052142d38eeafbbffcb06b36d738a6e"
|
||||
dependencies = [
|
||||
"derive_utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-lifetimes"
|
||||
version = "1.0.11"
|
||||
@@ -5427,6 +5468,32 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b"
|
||||
|
||||
[[package]]
|
||||
name = "mysql"
|
||||
version = "25.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4cc09a8118051e4617886c9c6e693c61444c2eeb5f9a792dc5d631501706565"
|
||||
dependencies = [
|
||||
"bufstream",
|
||||
"bytes",
|
||||
"crossbeam",
|
||||
"flate2",
|
||||
"io-enum",
|
||||
"libc",
|
||||
"lru",
|
||||
"mysql_common 0.32.0",
|
||||
"named_pipe",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"pem",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"socket2 0.5.5",
|
||||
"twox-hash",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql-common-derive"
|
||||
version = "0.30.2"
|
||||
@@ -5608,6 +5675,33 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "named_pipe"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad9c443cce91fc3e12f017290db75dde490d685cdaaf508d7159d7cf41f0eb2b"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ndk-context"
|
||||
version = "0.1.1"
|
||||
@@ -6012,12 +6106,50 @@ dependencies = [
|
||||
"tokio-rustls 0.25.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.64"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cfg-if 1.0.0",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.102"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.21.0"
|
||||
@@ -10072,6 +10204,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"lazy_static",
|
||||
"mysql",
|
||||
"nix 0.26.4",
|
||||
"prometheus",
|
||||
"rand",
|
||||
@@ -10103,6 +10236,7 @@ dependencies = [
|
||||
"dotenv",
|
||||
"lazy_static",
|
||||
"libfuzzer-sys",
|
||||
"mysql",
|
||||
"partition",
|
||||
"rand",
|
||||
"rand_chacha",
|
||||
|
||||
@@ -12,6 +12,7 @@ common-macro.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time = { workspace = true }
|
||||
lazy_static.workspace = true
|
||||
mysql = "25.0"
|
||||
nix = { version = "0.26", features = ["process"] }
|
||||
prometheus.workspace = true
|
||||
rand = { workspace = true }
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
use snafu::{location, Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
@@ -55,6 +55,23 @@ pub enum Error {
|
||||
error: sqlx::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to request mysql, error: {}", err_msg))]
|
||||
RequestMysql {
|
||||
err_msg: String,
|
||||
#[snafu(source)]
|
||||
error: mysql::Error,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<tests_fuzz::error::Error> for Error {
|
||||
fn from(e: tests_fuzz::error::Error) -> Self {
|
||||
Self::Unexpected {
|
||||
err_msg: e.to_string(),
|
||||
location: location!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use axum::extract::{Query, State};
|
||||
use axum::Router;
|
||||
use bare::process::{Pid, ProcessManager};
|
||||
use common_telemetry::{info, warn};
|
||||
use mysql::prelude::Queryable;
|
||||
use nix::sys::signal::Signal;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use rand_chacha::ChaChaRng;
|
||||
@@ -29,10 +30,15 @@ use serde::Serialize;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sqlx::mysql::MySqlPoolOptions;
|
||||
use sqlx::{MySql, Pool};
|
||||
use tests_fuzz::context::TableContext;
|
||||
use tests_fuzz::context::{Rows, TableContext};
|
||||
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::Generator;
|
||||
use tests_fuzz::ir::select_expr::{Direction, SelectExpr};
|
||||
use tests_fuzz::ir::AlterTableOperation;
|
||||
use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator;
|
||||
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
|
||||
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
|
||||
use tests_fuzz::translator::mysql::select_expr::SelectExprTranslator;
|
||||
use tests_fuzz::translator::DslTranslator;
|
||||
use tests_fuzz::validator;
|
||||
use tokio::fs::File;
|
||||
@@ -45,7 +51,7 @@ mod utils;
|
||||
use axum::routing::get;
|
||||
use prometheus::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{Error, RequestMysqlSnafu, Result};
|
||||
use crate::utils::{generate_create_table_expr, get_conf_path, path_to_stdio, render_config_file};
|
||||
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
|
||||
|
||||
@@ -81,6 +87,7 @@ lazy_static::lazy_static! {
|
||||
static ref UP_COUNTER: IntCounter = register_int_counter!("up", "up counter").unwrap();
|
||||
}
|
||||
|
||||
// cargo run --package tests-chaos --bin tests-chaos
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -96,6 +103,14 @@ async fn main() {
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let test_dir = "/home/lfc/test-cuckoo/";
|
||||
// Remove everything in the test directory, to make sure we have a clean start.
|
||||
match std::fs::remove_dir_all(test_dir) {
|
||||
Err(e) if e.kind() != std::io::ErrorKind::NotFound => panic!("{e:?}"),
|
||||
_ => {}
|
||||
}
|
||||
std::fs::create_dir_all(test_dir).unwrap();
|
||||
|
||||
let state = Arc::new(TestState {
|
||||
killed: AtomicBool::new(false),
|
||||
});
|
||||
@@ -106,30 +121,90 @@ async fn main() {
|
||||
loop {
|
||||
warn!("Staring");
|
||||
UP_COUNTER.inc();
|
||||
let pid = start_database().await.expect("Failed to start database");
|
||||
let pid = start_database(test_dir).await.unwrap();
|
||||
let secs = rng.gen_range(100..300);
|
||||
moved_state.killed.store(false, Ordering::Relaxed);
|
||||
tokio::time::sleep(Duration::from_millis(secs)).await;
|
||||
warn!("After {secs}ms, Killing pid: {pid}");
|
||||
moved_state.killed.store(true, Ordering::Relaxed);
|
||||
|
||||
// Flush the database before restarting it. Because cuckoo does not enable WAL,
|
||||
// data may not survive the restart if not flush them.
|
||||
flush_db().await;
|
||||
|
||||
ProcessManager::kill(pid, Signal::SIGKILL).expect("Failed to kill");
|
||||
}
|
||||
});
|
||||
let mut rng = ChaChaRng::seed_from_u64(0);
|
||||
let client = connect_db("127.0.0.1:4002").await;
|
||||
let mut sqlx = sqlx_connections().await;
|
||||
let mut mysql = mysql_connections().await;
|
||||
let mut created_table = HashSet::new();
|
||||
|
||||
// Runs maximum 10000 times.
|
||||
for _i in 0..10000 {
|
||||
if let Err(e) = run_test(&sqlx, &mysql, &mut created_table, &state, &mut rng).await {
|
||||
if matches!(e, Error::ExecuteQuery { .. } | Error::RequestMysql { .. })
|
||||
&& state.killed.load(Ordering::Relaxed)
|
||||
{
|
||||
// If the query error is caused by restarting the database
|
||||
// (which is an intended action), reconnect.
|
||||
sqlx = sqlx_connections().await;
|
||||
mysql = mysql_connections().await;
|
||||
} else {
|
||||
panic!("{e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Successfully runs DDL chaos testing for cuckoo!");
|
||||
}
|
||||
|
||||
async fn flush_db() {
|
||||
info!("Start flushing the database ...");
|
||||
let _ = reqwest::get("http://127.0.0.1:4000/v1/admin/flush?db=public")
|
||||
.await
|
||||
.unwrap()
|
||||
.bytes()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn mysql_connections() -> mysql::Pool {
|
||||
let mut max_retry = 10;
|
||||
loop {
|
||||
run_test(&client, &mut created_table, &state, &mut rng)
|
||||
.await
|
||||
.unwrap();
|
||||
match mysql::Pool::new("mysql://127.0.0.1:4002/public") {
|
||||
Ok(x) => return x,
|
||||
Err(e) => {
|
||||
max_retry -= 1;
|
||||
if max_retry == 0 {
|
||||
panic!("{e:?}")
|
||||
} else {
|
||||
info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_db(addr: &str) -> Pool<MySql> {
|
||||
MySqlPoolOptions::new()
|
||||
.connect(&format!("mysql://{addr}/public"))
|
||||
.await
|
||||
.unwrap()
|
||||
async fn sqlx_connections() -> Pool<MySql> {
|
||||
let mut max_retry = 10;
|
||||
loop {
|
||||
match MySqlPoolOptions::new()
|
||||
.connect("mysql://127.0.0.1:4002/public")
|
||||
.await
|
||||
{
|
||||
Ok(x) => return x,
|
||||
Err(e) => {
|
||||
max_retry -= 1;
|
||||
if max_retry == 0 {
|
||||
panic!("{e:?}")
|
||||
} else {
|
||||
info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TestState {
|
||||
@@ -138,19 +213,21 @@ struct TestState {
|
||||
|
||||
async fn run_test<R: Rng + 'static>(
|
||||
client: &Pool<MySql>,
|
||||
pool: &mysql::Pool,
|
||||
created_table: &mut HashSet<String>,
|
||||
state: &Arc<TestState>,
|
||||
rng: &mut R,
|
||||
) -> Result<()> {
|
||||
let expr = generate_create_table_expr(rng);
|
||||
let table_name = expr.table_name.to_string();
|
||||
if created_table.contains(&table_name) {
|
||||
if !created_table.insert(table_name.clone()) {
|
||||
warn!("ignores same name table: {table_name}");
|
||||
// ignores.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut table_ctx = Arc::new(TableContext::from(&expr));
|
||||
let mut table_ctx = TableContext::from(&expr);
|
||||
|
||||
let translator = CreateTableExprTranslator;
|
||||
let sql = translator.translate(&expr).unwrap();
|
||||
let result = sqlx::query(&sql).execute(client).await;
|
||||
@@ -159,7 +236,6 @@ async fn run_test<R: Rng + 'static>(
|
||||
Ok(result) => {
|
||||
validate_mysql(client, state, &table_ctx).await;
|
||||
info!("Create table: {sql}, result: {result:?}");
|
||||
created_table.insert(table_name);
|
||||
}
|
||||
Err(err) => {
|
||||
ensure!(
|
||||
@@ -176,7 +252,7 @@ async fn run_test<R: Rng + 'static>(
|
||||
let actions = rng.gen_range(1..20);
|
||||
|
||||
for _ in 0..actions {
|
||||
let expr = generate_alter_table_expr(table_ctx.clone(), rng);
|
||||
let expr = generate_alter_table_expr(Arc::new(table_ctx.clone()), rng);
|
||||
if let AlterTableOperation::RenameTable { new_table_name } = &expr.alter_options {
|
||||
let table_name = new_table_name.to_string();
|
||||
if created_table.contains(&table_name) {
|
||||
@@ -185,6 +261,8 @@ async fn run_test<R: Rng + 'static>(
|
||||
}
|
||||
};
|
||||
|
||||
insert_rows(pool, &mut table_ctx, rng)?;
|
||||
|
||||
let translator = AlterTableExprTranslator;
|
||||
let sql = translator.translate(&expr).unwrap();
|
||||
let result = sqlx::query(&sql).execute(client).await;
|
||||
@@ -193,13 +271,13 @@ async fn run_test<R: Rng + 'static>(
|
||||
info!("alter table: {sql}, result: {result:?}");
|
||||
let table_name = table_ctx.name.to_string();
|
||||
created_table.remove(&table_name);
|
||||
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap());
|
||||
table_ctx.alter(expr)?;
|
||||
validate_mysql(client, state, &table_ctx).await;
|
||||
let table_name = table_ctx.name.to_string();
|
||||
created_table.insert(table_name);
|
||||
}
|
||||
Err(err) => {
|
||||
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap());
|
||||
table_ctx.alter(expr)?;
|
||||
let table_name = table_ctx.name.to_string();
|
||||
created_table.insert(table_name);
|
||||
ensure!(
|
||||
@@ -211,11 +289,90 @@ async fn run_test<R: Rng + 'static>(
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let actual_rows = fetch_all_rows(&table_ctx, pool)?;
|
||||
info!("fetch all rows after alter: {actual_rows}");
|
||||
if actual_rows.empty() && state.killed.load(Ordering::Relaxed) {
|
||||
// Cuckoo does not have WAL enabled; therefore the data could be cleared across restart.
|
||||
// When that happened, clear the saved data that are used for comparison, too.
|
||||
table_ctx.clear_data();
|
||||
} else {
|
||||
assert_eq!(
|
||||
&table_ctx.rows, &actual_rows,
|
||||
r#"rows not equal:
|
||||
expect: {}
|
||||
actual: {}"#,
|
||||
&table_ctx.rows, &actual_rows
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_rows<R: Rng + 'static>(
|
||||
pool: &mysql::Pool,
|
||||
table_ctx: &mut TableContext,
|
||||
rng: &mut R,
|
||||
) -> Result<()> {
|
||||
let insert_expr = InsertExprGeneratorBuilder::default()
|
||||
.table_ctx(Arc::new(table_ctx.clone()))
|
||||
.build()
|
||||
.unwrap()
|
||||
.generate(rng)
|
||||
.unwrap();
|
||||
let sql = InsertIntoExprTranslator.translate(&insert_expr).unwrap();
|
||||
info!("executing insertion: {sql}");
|
||||
|
||||
let mut conn = pool.get_conn().context(RequestMysqlSnafu {
|
||||
err_msg: "get connection",
|
||||
})?;
|
||||
conn.query_drop(&sql).with_context(|_| RequestMysqlSnafu {
|
||||
err_msg: format!("executing sql '{}'", sql),
|
||||
})?;
|
||||
|
||||
if conn.affected_rows() > 0 {
|
||||
table_ctx.insert(insert_expr)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Sqlx treats all queries as prepared, we have to switch to another mysql client library here.
|
||||
// There's a error when trying to query GreptimeDB with prepared statement:
|
||||
// "tried to use [50, 48, ..., 56] as MYSQL_TYPE_TIMESTAMP"
|
||||
// Besides, sqlx is suited for cases where table schema is known (and representable in codes),
|
||||
// definitely not here.
|
||||
fn fetch_all_rows(table_ctx: &TableContext, pool: &mysql::Pool) -> Result<Rows> {
|
||||
let select_expr = SelectExpr {
|
||||
table_name: table_ctx.name.to_string(),
|
||||
columns: table_ctx.columns.clone(),
|
||||
order_by: vec![table_ctx
|
||||
.columns
|
||||
.iter()
|
||||
.find_map(|c| {
|
||||
if c.is_time_index() {
|
||||
Some(c.name.to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap()],
|
||||
direction: Direction::Asc,
|
||||
limit: usize::MAX,
|
||||
};
|
||||
let sql = SelectExprTranslator.translate(&select_expr).unwrap();
|
||||
info!("executing selection: {sql}");
|
||||
|
||||
let mut conn = pool.get_conn().context(RequestMysqlSnafu {
|
||||
err_msg: "get connection",
|
||||
})?;
|
||||
let rows: Vec<mysql::Row> = conn.query(&sql).with_context(|_| RequestMysqlSnafu {
|
||||
err_msg: format!("executing sql: {}", sql),
|
||||
})?;
|
||||
|
||||
Ok(Rows::fill(rows))
|
||||
}
|
||||
|
||||
async fn validate_mysql(client: &Pool<MySql>, _state: &Arc<TestState>, table_ctx: &TableContext) {
|
||||
loop {
|
||||
match validator::column::fetch_columns_via_mysql(
|
||||
@@ -240,9 +397,8 @@ async fn validate_mysql(client: &Pool<MySql>, _state: &Arc<TestState>, table_ctx
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_database() -> Result<Pid> {
|
||||
let binary_path = "/home/weny/Projects/greptimedb-cuckoo/target/debug/greptime";
|
||||
let test_dir = "/tmp/greptimedb-cuckoo/";
|
||||
async fn start_database(test_dir: &str) -> Result<Pid> {
|
||||
let binary_path = "/home/lfc/greptimedb-cuckoo/target/debug/greptime";
|
||||
let template_filename = "standalone-v0.3.2.toml.template";
|
||||
let health_url = "http://127.0.0.1:4000/health";
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ derive_builder = { workspace = true }
|
||||
dotenv = "0.15"
|
||||
lazy_static = { workspace = true }
|
||||
libfuzzer-sys = "0.4"
|
||||
mysql = "25.0"
|
||||
partition = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = "0.3.1"
|
||||
|
||||
@@ -12,9 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::vec;
|
||||
|
||||
use common_query::AddColumnLocation;
|
||||
use common_telemetry::info;
|
||||
use common_time::timezone::parse_timezone;
|
||||
use datatypes::value::Value;
|
||||
use partition::partition::PartitionDef;
|
||||
use rand::Rng;
|
||||
use snafu::{ensure, OptionExt};
|
||||
@@ -22,15 +27,171 @@ use snafu::{ensure, OptionExt};
|
||||
use crate::error::{self, Result};
|
||||
use crate::generator::Random;
|
||||
use crate::ir::alter_expr::AlterTableOperation;
|
||||
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
|
||||
use crate::ir::create_expr::ColumnOption;
|
||||
use crate::ir::insert_expr::RowValue;
|
||||
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident, InsertIntoExpr};
|
||||
|
||||
pub type TableContextRef = Arc<TableContext>;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd)]
|
||||
struct Cell {
|
||||
value: mysql::Value,
|
||||
}
|
||||
|
||||
impl Cell {
|
||||
fn null() -> Self {
|
||||
Self {
|
||||
value: mysql::Value::NULL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Cell {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.value.as_sql(true))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Value> for Cell {
|
||||
fn from(value: Value) -> Self {
|
||||
if value.is_null() {
|
||||
Self::null()
|
||||
} else {
|
||||
let s = match value {
|
||||
Value::Timestamp(t) => {
|
||||
t.to_timezone_aware_string(Some(&parse_timezone(Some("+08:00"))))
|
||||
}
|
||||
Value::Boolean(b) => if b { "1" } else { "0" }.to_string(),
|
||||
_ => value.to_string(),
|
||||
};
|
||||
Self {
|
||||
value: mysql::Value::Bytes(s.into_bytes()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<mysql::Value> for Cell {
|
||||
fn from(value: mysql::Value) -> Self {
|
||||
Self { value }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct Row {
|
||||
cells: Vec<Cell>,
|
||||
}
|
||||
|
||||
impl Row {
|
||||
fn with(columns_len: usize) -> Self {
|
||||
Self {
|
||||
cells: vec![Cell::null(); columns_len],
|
||||
}
|
||||
}
|
||||
|
||||
fn set_cell(&mut self, at: usize, cell: Cell) {
|
||||
self.cells[at] = cell;
|
||||
}
|
||||
|
||||
fn add_column(&mut self, at: usize, cell: Cell) {
|
||||
self.cells.insert(at, cell);
|
||||
}
|
||||
|
||||
fn drop_column(&mut self, at: usize) {
|
||||
self.cells.remove(at);
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Row {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "[")?;
|
||||
for (i, v) in self.cells.iter().enumerate() {
|
||||
write!(f, "{}\t", v)?;
|
||||
if i + 1 < self.cells.len() {
|
||||
write!(f, "\t")?;
|
||||
}
|
||||
}
|
||||
write!(f, "]")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Default)]
|
||||
pub struct Rows {
|
||||
rows: Vec<Row>,
|
||||
}
|
||||
|
||||
impl Rows {
|
||||
pub fn empty(&self) -> bool {
|
||||
self.rows.is_empty()
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self { rows: vec![] }
|
||||
}
|
||||
|
||||
pub fn fill(rows: Vec<mysql::Row>) -> Self {
|
||||
Self {
|
||||
rows: rows
|
||||
.into_iter()
|
||||
.map(|r| Row {
|
||||
cells: r.unwrap().into_iter().map(Into::into).collect(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn with(rows: usize) -> Self {
|
||||
Self {
|
||||
rows: Vec::with_capacity(rows),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_column(&mut self, at: usize, column: &Column) {
|
||||
let cell = column
|
||||
.options
|
||||
.iter()
|
||||
.find_map(|x| {
|
||||
if let ColumnOption::DefaultValue(v) = x {
|
||||
Some(v.clone().into())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or(Cell::null());
|
||||
self.rows
|
||||
.iter_mut()
|
||||
.for_each(|x| x.add_column(at, cell.clone()))
|
||||
}
|
||||
|
||||
fn drop_column(&mut self, at: usize) {
|
||||
self.rows.iter_mut().for_each(|x| x.drop_column(at))
|
||||
}
|
||||
|
||||
fn add_row(&mut self, row: Row) {
|
||||
self.rows.push(row)
|
||||
}
|
||||
|
||||
fn extend(&mut self, rows: Rows) {
|
||||
self.rows.extend(rows.rows)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Rows {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
writeln!(f, "Rows")?;
|
||||
for r in &self.rows {
|
||||
writeln!(f, "{}", r)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// TableContext stores table info.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TableContext {
|
||||
pub name: Ident,
|
||||
pub columns: Vec<Column>,
|
||||
pub rows: Rows,
|
||||
|
||||
// GreptimeDB specific options
|
||||
pub partition: Option<PartitionDef>,
|
||||
@@ -50,6 +211,7 @@ impl From<&CreateTableExpr> for TableContext {
|
||||
Self {
|
||||
name: name.clone(),
|
||||
columns: columns.clone(),
|
||||
rows: Rows::new(),
|
||||
partition: partition.clone(),
|
||||
primary_keys: primary_keys.clone(),
|
||||
}
|
||||
@@ -57,8 +219,72 @@ impl From<&CreateTableExpr> for TableContext {
|
||||
}
|
||||
|
||||
impl TableContext {
|
||||
pub fn clear_data(&mut self) {
|
||||
self.rows = Rows::new();
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, expr: InsertIntoExpr) -> Result<()> {
|
||||
fn find_default_value(column: &Column) -> Cell {
|
||||
column
|
||||
.options
|
||||
.iter()
|
||||
.find_map(|opt| {
|
||||
if let ColumnOption::DefaultValue(v) = opt {
|
||||
Some(v.clone().into())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or(Cell::null())
|
||||
}
|
||||
|
||||
let mut rows = Rows::with(expr.values_list.len());
|
||||
|
||||
for insert_values in expr.values_list.into_iter() {
|
||||
let mut row = Row::with(self.columns.len());
|
||||
|
||||
for (i, column) in self.columns.iter().enumerate() {
|
||||
let cell = if let Some(v) = expr
|
||||
.columns
|
||||
.iter()
|
||||
.zip(insert_values.iter())
|
||||
.find_map(|(x, y)| if x.name == column.name { Some(y) } else { None })
|
||||
{
|
||||
match v {
|
||||
RowValue::Value(v) => v.clone().into(),
|
||||
RowValue::Default => find_default_value(column),
|
||||
}
|
||||
} else {
|
||||
find_default_value(column)
|
||||
};
|
||||
row.set_cell(i, cell);
|
||||
}
|
||||
rows.add_row(row);
|
||||
}
|
||||
self.rows.extend(rows);
|
||||
|
||||
let time_index = self.columns.iter().position(|x| x.is_time_index()).unwrap();
|
||||
self.rows.rows.sort_by(|x, y| {
|
||||
x.cells[time_index]
|
||||
.partial_cmp(&y.cells[time_index])
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
|
||||
info!("after insertion, expected rows: {}", self.rows);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_column_index(&self, column_name: &str) -> Result<usize> {
|
||||
self.columns
|
||||
.iter()
|
||||
.position(|col| col.name.to_string() == column_name)
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: format!("Column: {column_name} not found"),
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies the [AlterTableExpr].
|
||||
pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
|
||||
pub fn alter(&mut self, expr: AlterTableExpr) -> Result<()> {
|
||||
match expr.alter_options {
|
||||
AlterTableOperation::AddColumn { column, location } => {
|
||||
ensure!(
|
||||
@@ -69,23 +295,18 @@ impl TableContext {
|
||||
);
|
||||
match location {
|
||||
Some(AddColumnLocation::First) => {
|
||||
let mut columns = Vec::with_capacity(self.columns.len() + 1);
|
||||
columns.push(column);
|
||||
columns.extend(self.columns);
|
||||
self.columns = columns;
|
||||
self.rows.add_column(0, &column);
|
||||
self.columns.insert(0, column);
|
||||
}
|
||||
Some(AddColumnLocation::After { column_name }) => {
|
||||
let index = self
|
||||
.columns
|
||||
.iter()
|
||||
// TODO(weny): find a better way?
|
||||
.position(|col| col.name.to_string() == column_name)
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: format!("Column: {column_name} not found"),
|
||||
})?;
|
||||
let index = self.find_column_index(&column_name)?;
|
||||
self.rows.add_column(index + 1, &column);
|
||||
self.columns.insert(index + 1, column);
|
||||
}
|
||||
None => self.columns.push(column),
|
||||
None => {
|
||||
self.rows.add_column(self.columns.len(), &column);
|
||||
self.columns.push(column);
|
||||
}
|
||||
}
|
||||
// Re-generates the primary_keys
|
||||
self.primary_keys = self
|
||||
@@ -100,10 +321,12 @@ impl TableContext {
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(self)
|
||||
}
|
||||
AlterTableOperation::DropColumn { name } => {
|
||||
self.columns.retain(|col| col.name != name);
|
||||
let at = self.find_column_index(&name.to_string())?;
|
||||
self.columns.remove(at);
|
||||
self.rows.drop_column(at);
|
||||
|
||||
// Re-generates the primary_keys
|
||||
self.primary_keys = self
|
||||
.columns
|
||||
@@ -117,7 +340,6 @@ impl TableContext {
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(self)
|
||||
}
|
||||
AlterTableOperation::RenameTable { new_table_name } => {
|
||||
ensure!(
|
||||
@@ -127,9 +349,9 @@ impl TableContext {
|
||||
}
|
||||
);
|
||||
self.name = new_table_name;
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn generate_unique_column_name<R: Rng>(
|
||||
@@ -162,16 +384,17 @@ mod tests {
|
||||
use common_query::AddColumnLocation;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
|
||||
use super::TableContext;
|
||||
use super::*;
|
||||
use crate::ir::alter_expr::AlterTableOperation;
|
||||
use crate::ir::create_expr::ColumnOption;
|
||||
use crate::ir::{AlterTableExpr, Column, Ident};
|
||||
|
||||
#[test]
|
||||
fn test_table_context_alter() {
|
||||
let table_ctx = TableContext {
|
||||
let mut table_ctx = TableContext {
|
||||
name: "foo".into(),
|
||||
columns: vec![],
|
||||
rows: Rows::new(),
|
||||
partition: None,
|
||||
primary_keys: vec![],
|
||||
};
|
||||
@@ -187,7 +410,7 @@ mod tests {
|
||||
location: None,
|
||||
},
|
||||
};
|
||||
let table_ctx = table_ctx.alter(expr).unwrap();
|
||||
table_ctx.alter(expr).unwrap();
|
||||
assert_eq!(table_ctx.columns[0].name, Ident::new("a"));
|
||||
assert_eq!(table_ctx.primary_keys, vec![0]);
|
||||
|
||||
@@ -203,7 +426,7 @@ mod tests {
|
||||
location: Some(AddColumnLocation::First),
|
||||
},
|
||||
};
|
||||
let table_ctx = table_ctx.alter(expr).unwrap();
|
||||
table_ctx.alter(expr).unwrap();
|
||||
assert_eq!(table_ctx.columns[0].name, Ident::new("b"));
|
||||
assert_eq!(table_ctx.primary_keys, vec![0, 1]);
|
||||
|
||||
@@ -221,7 +444,7 @@ mod tests {
|
||||
}),
|
||||
},
|
||||
};
|
||||
let table_ctx = table_ctx.alter(expr).unwrap();
|
||||
table_ctx.alter(expr).unwrap();
|
||||
assert_eq!(table_ctx.columns[1].name, Ident::new("c"));
|
||||
assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]);
|
||||
|
||||
@@ -230,7 +453,7 @@ mod tests {
|
||||
table_name: "foo".into(),
|
||||
alter_options: AlterTableOperation::DropColumn { name: "b".into() },
|
||||
};
|
||||
let table_ctx = table_ctx.alter(expr).unwrap();
|
||||
table_ctx.alter(expr).unwrap();
|
||||
assert_eq!(table_ctx.columns[1].name, Ident::new("a"));
|
||||
assert_eq!(table_ctx.primary_keys, vec![0, 1]);
|
||||
}
|
||||
|
||||
@@ -94,11 +94,7 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
|
||||
|
||||
Ok(InsertIntoExpr {
|
||||
table_name: self.table_ctx.name.to_string(),
|
||||
columns: if omit_column_list {
|
||||
vec![]
|
||||
} else {
|
||||
values_columns
|
||||
},
|
||||
columns: values_columns,
|
||||
values_list,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
//! The intermediate representation
|
||||
|
||||
pub(crate) mod alter_expr;
|
||||
pub(crate) mod create_expr;
|
||||
pub(crate) mod insert_expr;
|
||||
pub(crate) mod select_expr;
|
||||
pub mod create_expr;
|
||||
pub mod insert_expr;
|
||||
pub mod select_expr;
|
||||
|
||||
use core::fmt;
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use datatypes::value::Value;
|
||||
|
||||
use crate::ir::Column;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InsertIntoExpr {
|
||||
pub table_name: String,
|
||||
pub columns: Vec<Column>,
|
||||
@@ -26,6 +27,7 @@ pub struct InsertIntoExpr {
|
||||
|
||||
pub type RowValues = Vec<RowValue>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RowValue {
|
||||
Value(Value),
|
||||
Default,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
|
||||
use crate::context::TableContext;
|
||||
use crate::context::{Rows, TableContext};
|
||||
use crate::ir::create_expr::ColumnOption;
|
||||
use crate::ir::Column;
|
||||
|
||||
@@ -53,6 +53,7 @@ pub fn new_test_ctx() -> TableContext {
|
||||
options: vec![ColumnOption::TimeIndex],
|
||||
},
|
||||
],
|
||||
rows: Rows::new(),
|
||||
partition: None,
|
||||
primary_keys: vec![],
|
||||
}
|
||||
|
||||
@@ -155,7 +155,9 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
.context(error::ExecuteQuerySnafu { sql: &sql })?;
|
||||
info!("Alter table: {sql}, result: {result:?}");
|
||||
// Applies changes
|
||||
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap());
|
||||
let mut t = Arc::unwrap_or_clone(table_ctx);
|
||||
t.alter(expr).unwrap();
|
||||
table_ctx = Arc::new(t);
|
||||
|
||||
// Validates columns
|
||||
let mut column_entries = validator::column::fetch_columns_via_mysql(
|
||||
|
||||
Reference in New Issue
Block a user