From ee67ce10c9eaa2b942ab80ee2f978c213b60586b Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 14 May 2024 20:37:54 +0800 Subject: [PATCH] insert some rows and query them across DDL to test the integrity of data --- Cargo.lock | 134 ++++++++++++ tests-chaos/Cargo.toml | 1 + tests-chaos/src/error.rs | 19 +- tests-chaos/src/main.rs | 198 +++++++++++++++-- tests-fuzz/Cargo.toml | 1 + tests-fuzz/src/context.rs | 273 +++++++++++++++++++++--- tests-fuzz/src/generator/insert_expr.rs | 6 +- tests-fuzz/src/ir.rs | 6 +- tests-fuzz/src/ir/insert_expr.rs | 2 + tests-fuzz/src/test_utils.rs | 3 +- tests-fuzz/targets/fuzz_alter_table.rs | 4 +- 11 files changed, 590 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fbf42ae98..f0d55f5c8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1074,6 +1074,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bufstream" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8" + [[package]] name = "build-data" version = "0.1.5" @@ -2957,6 +2963,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_utils" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61bb5a1014ce6dfc2a378578509abe775a5aa06bff584a547555d9efdb81b926" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.43", +] + [[package]] name = "diff" version = "0.1.13" @@ -3425,6 +3442,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -4414,6 +4446,15 @@ version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8573b2b1fb643a372c73b23f4da5f888677feef3305146d68a539250a9bccc7" +[[package]] +name = "io-enum" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b53d712d99a73eec59ee5e4fe6057f8052142d38eeafbbffcb06b36d738a6e" +dependencies = [ + "derive_utils", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -5427,6 +5468,32 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b" +[[package]] +name = "mysql" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4cc09a8118051e4617886c9c6e693c61444c2eeb5f9a792dc5d631501706565" +dependencies = [ + "bufstream", + "bytes", + "crossbeam", + "flate2", + "io-enum", + "libc", + "lru", + "mysql_common 0.32.0", + "named_pipe", + "native-tls", + "once_cell", + "pem", + "percent-encoding", + "serde", + "serde_json", + "socket2 0.5.5", + "twox-hash", + "url", +] + [[package]] name = "mysql-common-derive" version = "0.30.2" @@ -5608,6 +5675,33 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "named_pipe" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad9c443cce91fc3e12f017290db75dde490d685cdaaf508d7159d7cf41f0eb2b" +dependencies = [ + "winapi", +] + +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk-context" version = "0.1.1" @@ -6012,12 +6106,50 @@ dependencies = [ "tokio-rustls 0.25.0", ] +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.4.1", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.43", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.21.0" @@ -10072,6 +10204,7 @@ dependencies = [ "common-telemetry", "common-time", "lazy_static", + "mysql", "nix 0.26.4", "prometheus", "rand", @@ -10103,6 +10236,7 @@ dependencies = [ "dotenv", "lazy_static", "libfuzzer-sys", + "mysql", "partition", "rand", "rand_chacha", diff --git a/tests-chaos/Cargo.toml b/tests-chaos/Cargo.toml index fd966c64f4..b0953abe05 100644 --- a/tests-chaos/Cargo.toml +++ b/tests-chaos/Cargo.toml @@ -12,6 +12,7 @@ common-macro.workspace = true common-telemetry.workspace = true common-time = { workspace = true } lazy_static.workspace = true +mysql = "25.0" nix = { version = "0.26", features = ["process"] } prometheus.workspace = true rand = { workspace = true } diff --git a/tests-chaos/src/error.rs b/tests-chaos/src/error.rs index 4974531ec4..ff6a567c27 100644 --- a/tests-chaos/src/error.rs +++ b/tests-chaos/src/error.rs @@ -1,5 +1,5 @@ use common_macro::stack_trace_debug; -use snafu::{Location, Snafu}; +use snafu::{location, Location, Snafu}; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -55,6 +55,23 @@ pub enum Error { error: sqlx::error::Error, location: Location, }, + + #[snafu(display("Failed to request mysql, error: {}", err_msg))] + RequestMysql { + err_msg: String, + #[snafu(source)] + error: mysql::Error, + location: Location, + }, } pub type Result = std::result::Result; + +impl From for Error { + fn from(e: tests_fuzz::error::Error) -> Self { + Self::Unexpected { + err_msg: e.to_string(), + location: location!(), + } + } +} diff --git a/tests-chaos/src/main.rs b/tests-chaos/src/main.rs index 2caff8345d..a585124e45 100644 --- a/tests-chaos/src/main.rs +++ b/tests-chaos/src/main.rs @@ -22,6 +22,7 @@ use axum::extract::{Query, State}; use axum::Router; use bare::process::{Pid, ProcessManager}; use common_telemetry::{info, warn}; +use mysql::prelude::Queryable; use nix::sys::signal::Signal; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; @@ -29,10 +30,15 @@ use serde::Serialize; use snafu::{ensure, ResultExt}; use sqlx::mysql::MySqlPoolOptions; use sqlx::{MySql, Pool}; -use tests_fuzz::context::TableContext; +use tests_fuzz::context::{Rows, TableContext}; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::Generator; +use tests_fuzz::ir::select_expr::{Direction, SelectExpr}; use tests_fuzz::ir::AlterTableOperation; use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::mysql::select_expr::SelectExprTranslator; use tests_fuzz::translator::DslTranslator; use tests_fuzz::validator; use tokio::fs::File; @@ -45,7 +51,7 @@ mod utils; use axum::routing::get; use prometheus::{register_int_counter, Encoder, IntCounter, TextEncoder}; -use crate::error::Result; +use crate::error::{Error, RequestMysqlSnafu, Result}; use crate::utils::{generate_create_table_expr, get_conf_path, path_to_stdio, render_config_file}; const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; @@ -81,6 +87,7 @@ lazy_static::lazy_static! { static ref UP_COUNTER: IntCounter = register_int_counter!("up", "up counter").unwrap(); } +// cargo run --package tests-chaos --bin tests-chaos #[tokio::main] async fn main() { common_telemetry::init_default_ut_logging(); @@ -96,6 +103,14 @@ async fn main() { .unwrap(); }); + let test_dir = "/home/lfc/test-cuckoo/"; + // Remove everything in the test directory, to make sure we have a clean start. + match std::fs::remove_dir_all(test_dir) { + Err(e) if e.kind() != std::io::ErrorKind::NotFound => panic!("{e:?}"), + _ => {} + } + std::fs::create_dir_all(test_dir).unwrap(); + let state = Arc::new(TestState { killed: AtomicBool::new(false), }); @@ -106,30 +121,90 @@ async fn main() { loop { warn!("Staring"); UP_COUNTER.inc(); - let pid = start_database().await.expect("Failed to start database"); + let pid = start_database(test_dir).await.unwrap(); let secs = rng.gen_range(100..300); moved_state.killed.store(false, Ordering::Relaxed); tokio::time::sleep(Duration::from_millis(secs)).await; warn!("After {secs}ms, Killing pid: {pid}"); moved_state.killed.store(true, Ordering::Relaxed); + + // Flush the database before restarting it. Because cuckoo does not enable WAL, + // data may not survive the restart if not flush them. + flush_db().await; + ProcessManager::kill(pid, Signal::SIGKILL).expect("Failed to kill"); } }); let mut rng = ChaChaRng::seed_from_u64(0); - let client = connect_db("127.0.0.1:4002").await; + let mut sqlx = sqlx_connections().await; + let mut mysql = mysql_connections().await; let mut created_table = HashSet::new(); + + // Runs maximum 10000 times. + for _i in 0..10000 { + if let Err(e) = run_test(&sqlx, &mysql, &mut created_table, &state, &mut rng).await { + if matches!(e, Error::ExecuteQuery { .. } | Error::RequestMysql { .. }) + && state.killed.load(Ordering::Relaxed) + { + // If the query error is caused by restarting the database + // (which is an intended action), reconnect. + sqlx = sqlx_connections().await; + mysql = mysql_connections().await; + } else { + panic!("{e:?}"); + } + } + } + info!("Successfully runs DDL chaos testing for cuckoo!"); +} + +async fn flush_db() { + info!("Start flushing the database ..."); + let _ = reqwest::get("http://127.0.0.1:4000/v1/admin/flush?db=public") + .await + .unwrap() + .bytes() + .await + .unwrap(); +} + +async fn mysql_connections() -> mysql::Pool { + let mut max_retry = 10; loop { - run_test(&client, &mut created_table, &state, &mut rng) - .await - .unwrap(); + match mysql::Pool::new("mysql://127.0.0.1:4002/public") { + Ok(x) => return x, + Err(e) => { + max_retry -= 1; + if max_retry == 0 { + panic!("{e:?}") + } else { + info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } } } -async fn connect_db(addr: &str) -> Pool { - MySqlPoolOptions::new() - .connect(&format!("mysql://{addr}/public")) - .await - .unwrap() +async fn sqlx_connections() -> Pool { + let mut max_retry = 10; + loop { + match MySqlPoolOptions::new() + .connect("mysql://127.0.0.1:4002/public") + .await + { + Ok(x) => return x, + Err(e) => { + max_retry -= 1; + if max_retry == 0 { + panic!("{e:?}") + } else { + info!("GreptimeDB is not connectable, maybe during restart. Wait 1 second to retry"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + } } struct TestState { @@ -138,19 +213,21 @@ struct TestState { async fn run_test( client: &Pool, + pool: &mysql::Pool, created_table: &mut HashSet, state: &Arc, rng: &mut R, ) -> Result<()> { let expr = generate_create_table_expr(rng); let table_name = expr.table_name.to_string(); - if created_table.contains(&table_name) { + if !created_table.insert(table_name.clone()) { warn!("ignores same name table: {table_name}"); // ignores. return Ok(()); } - let mut table_ctx = Arc::new(TableContext::from(&expr)); + let mut table_ctx = TableContext::from(&expr); + let translator = CreateTableExprTranslator; let sql = translator.translate(&expr).unwrap(); let result = sqlx::query(&sql).execute(client).await; @@ -159,7 +236,6 @@ async fn run_test( Ok(result) => { validate_mysql(client, state, &table_ctx).await; info!("Create table: {sql}, result: {result:?}"); - created_table.insert(table_name); } Err(err) => { ensure!( @@ -176,7 +252,7 @@ async fn run_test( let actions = rng.gen_range(1..20); for _ in 0..actions { - let expr = generate_alter_table_expr(table_ctx.clone(), rng); + let expr = generate_alter_table_expr(Arc::new(table_ctx.clone()), rng); if let AlterTableOperation::RenameTable { new_table_name } = &expr.alter_options { let table_name = new_table_name.to_string(); if created_table.contains(&table_name) { @@ -185,6 +261,8 @@ async fn run_test( } }; + insert_rows(pool, &mut table_ctx, rng)?; + let translator = AlterTableExprTranslator; let sql = translator.translate(&expr).unwrap(); let result = sqlx::query(&sql).execute(client).await; @@ -193,13 +271,13 @@ async fn run_test( info!("alter table: {sql}, result: {result:?}"); let table_name = table_ctx.name.to_string(); created_table.remove(&table_name); - table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap()); + table_ctx.alter(expr)?; validate_mysql(client, state, &table_ctx).await; let table_name = table_ctx.name.to_string(); created_table.insert(table_name); } Err(err) => { - table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap()); + table_ctx.alter(expr)?; let table_name = table_ctx.name.to_string(); created_table.insert(table_name); ensure!( @@ -211,11 +289,90 @@ async fn run_test( break; } } + + let actual_rows = fetch_all_rows(&table_ctx, pool)?; + info!("fetch all rows after alter: {actual_rows}"); + if actual_rows.empty() && state.killed.load(Ordering::Relaxed) { + // Cuckoo does not have WAL enabled; therefore the data could be cleared across restart. + // When that happened, clear the saved data that are used for comparison, too. + table_ctx.clear_data(); + } else { + assert_eq!( + &table_ctx.rows, &actual_rows, + r#"rows not equal: +expect: {} +actual: {}"#, + &table_ctx.rows, &actual_rows + ) + } } Ok(()) } +fn insert_rows( + pool: &mysql::Pool, + table_ctx: &mut TableContext, + rng: &mut R, +) -> Result<()> { + let insert_expr = InsertExprGeneratorBuilder::default() + .table_ctx(Arc::new(table_ctx.clone())) + .build() + .unwrap() + .generate(rng) + .unwrap(); + let sql = InsertIntoExprTranslator.translate(&insert_expr).unwrap(); + info!("executing insertion: {sql}"); + + let mut conn = pool.get_conn().context(RequestMysqlSnafu { + err_msg: "get connection", + })?; + conn.query_drop(&sql).with_context(|_| RequestMysqlSnafu { + err_msg: format!("executing sql '{}'", sql), + })?; + + if conn.affected_rows() > 0 { + table_ctx.insert(insert_expr)?; + } + Ok(()) +} + +// Sqlx treats all queries as prepared, we have to switch to another mysql client library here. +// There's a error when trying to query GreptimeDB with prepared statement: +// "tried to use [50, 48, ..., 56] as MYSQL_TYPE_TIMESTAMP" +// Besides, sqlx is suited for cases where table schema is known (and representable in codes), +// definitely not here. +fn fetch_all_rows(table_ctx: &TableContext, pool: &mysql::Pool) -> Result { + let select_expr = SelectExpr { + table_name: table_ctx.name.to_string(), + columns: table_ctx.columns.clone(), + order_by: vec![table_ctx + .columns + .iter() + .find_map(|c| { + if c.is_time_index() { + Some(c.name.to_string()) + } else { + None + } + }) + .unwrap()], + direction: Direction::Asc, + limit: usize::MAX, + }; + let sql = SelectExprTranslator.translate(&select_expr).unwrap(); + info!("executing selection: {sql}"); + + let mut conn = pool.get_conn().context(RequestMysqlSnafu { + err_msg: "get connection", + })?; + let rows: Vec = conn.query(&sql).with_context(|_| RequestMysqlSnafu { + err_msg: format!("executing sql: {}", sql), + })?; + + Ok(Rows::fill(rows)) +} + async fn validate_mysql(client: &Pool, _state: &Arc, table_ctx: &TableContext) { loop { match validator::column::fetch_columns_via_mysql( @@ -240,9 +397,8 @@ async fn validate_mysql(client: &Pool, _state: &Arc, table_ctx } } -async fn start_database() -> Result { - let binary_path = "/home/weny/Projects/greptimedb-cuckoo/target/debug/greptime"; - let test_dir = "/tmp/greptimedb-cuckoo/"; +async fn start_database(test_dir: &str) -> Result { + let binary_path = "/home/lfc/greptimedb-cuckoo/target/debug/greptime"; let template_filename = "standalone-v0.3.2.toml.template"; let health_url = "http://127.0.0.1:4000/health"; diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index fbf86bcb39..8f13cc3aac 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -24,6 +24,7 @@ derive_builder = { workspace = true } dotenv = "0.15" lazy_static = { workspace = true } libfuzzer-sys = "0.4" +mysql = "25.0" partition = { workspace = true } rand = { workspace = true } rand_chacha = "0.3.1" diff --git a/tests-fuzz/src/context.rs b/tests-fuzz/src/context.rs index 29536c853c..718b73627a 100644 --- a/tests-fuzz/src/context.rs +++ b/tests-fuzz/src/context.rs @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; use std::sync::Arc; +use std::vec; use common_query::AddColumnLocation; +use common_telemetry::info; +use common_time::timezone::parse_timezone; +use datatypes::value::Value; use partition::partition::PartitionDef; use rand::Rng; use snafu::{ensure, OptionExt}; @@ -22,15 +27,171 @@ use snafu::{ensure, OptionExt}; use crate::error::{self, Result}; use crate::generator::Random; use crate::ir::alter_expr::AlterTableOperation; -use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident}; +use crate::ir::create_expr::ColumnOption; +use crate::ir::insert_expr::RowValue; +use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident, InsertIntoExpr}; pub type TableContextRef = Arc; +#[derive(Debug, Clone, PartialEq, PartialOrd)] +struct Cell { + value: mysql::Value, +} + +impl Cell { + fn null() -> Self { + Self { + value: mysql::Value::NULL, + } + } +} + +impl Display for Cell { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value.as_sql(true)) + } +} + +impl From for Cell { + fn from(value: Value) -> Self { + if value.is_null() { + Self::null() + } else { + let s = match value { + Value::Timestamp(t) => { + t.to_timezone_aware_string(Some(&parse_timezone(Some("+08:00")))) + } + Value::Boolean(b) => if b { "1" } else { "0" }.to_string(), + _ => value.to_string(), + }; + Self { + value: mysql::Value::Bytes(s.into_bytes()), + } + } + } +} + +impl From for Cell { + fn from(value: mysql::Value) -> Self { + Self { value } + } +} + +#[derive(Debug, Clone, PartialEq)] +struct Row { + cells: Vec, +} + +impl Row { + fn with(columns_len: usize) -> Self { + Self { + cells: vec![Cell::null(); columns_len], + } + } + + fn set_cell(&mut self, at: usize, cell: Cell) { + self.cells[at] = cell; + } + + fn add_column(&mut self, at: usize, cell: Cell) { + self.cells.insert(at, cell); + } + + fn drop_column(&mut self, at: usize) { + self.cells.remove(at); + } +} + +impl Display for Row { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (i, v) in self.cells.iter().enumerate() { + write!(f, "{}\t", v)?; + if i + 1 < self.cells.len() { + write!(f, "\t")?; + } + } + write!(f, "]") + } +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct Rows { + rows: Vec, +} + +impl Rows { + pub fn empty(&self) -> bool { + self.rows.is_empty() + } + + pub fn new() -> Self { + Self { rows: vec![] } + } + + pub fn fill(rows: Vec) -> Self { + Self { + rows: rows + .into_iter() + .map(|r| Row { + cells: r.unwrap().into_iter().map(Into::into).collect(), + }) + .collect(), + } + } + + fn with(rows: usize) -> Self { + Self { + rows: Vec::with_capacity(rows), + } + } + + fn add_column(&mut self, at: usize, column: &Column) { + let cell = column + .options + .iter() + .find_map(|x| { + if let ColumnOption::DefaultValue(v) = x { + Some(v.clone().into()) + } else { + None + } + }) + .unwrap_or(Cell::null()); + self.rows + .iter_mut() + .for_each(|x| x.add_column(at, cell.clone())) + } + + fn drop_column(&mut self, at: usize) { + self.rows.iter_mut().for_each(|x| x.drop_column(at)) + } + + fn add_row(&mut self, row: Row) { + self.rows.push(row) + } + + fn extend(&mut self, rows: Rows) { + self.rows.extend(rows.rows) + } +} + +impl Display for Rows { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Rows")?; + for r in &self.rows { + writeln!(f, "{}", r)?; + } + Ok(()) + } +} + /// TableContext stores table info. #[derive(Debug, Clone)] pub struct TableContext { pub name: Ident, pub columns: Vec, + pub rows: Rows, // GreptimeDB specific options pub partition: Option, @@ -50,6 +211,7 @@ impl From<&CreateTableExpr> for TableContext { Self { name: name.clone(), columns: columns.clone(), + rows: Rows::new(), partition: partition.clone(), primary_keys: primary_keys.clone(), } @@ -57,8 +219,72 @@ impl From<&CreateTableExpr> for TableContext { } impl TableContext { + pub fn clear_data(&mut self) { + self.rows = Rows::new(); + } + + pub fn insert(&mut self, expr: InsertIntoExpr) -> Result<()> { + fn find_default_value(column: &Column) -> Cell { + column + .options + .iter() + .find_map(|opt| { + if let ColumnOption::DefaultValue(v) = opt { + Some(v.clone().into()) + } else { + None + } + }) + .unwrap_or(Cell::null()) + } + + let mut rows = Rows::with(expr.values_list.len()); + + for insert_values in expr.values_list.into_iter() { + let mut row = Row::with(self.columns.len()); + + for (i, column) in self.columns.iter().enumerate() { + let cell = if let Some(v) = expr + .columns + .iter() + .zip(insert_values.iter()) + .find_map(|(x, y)| if x.name == column.name { Some(y) } else { None }) + { + match v { + RowValue::Value(v) => v.clone().into(), + RowValue::Default => find_default_value(column), + } + } else { + find_default_value(column) + }; + row.set_cell(i, cell); + } + rows.add_row(row); + } + self.rows.extend(rows); + + let time_index = self.columns.iter().position(|x| x.is_time_index()).unwrap(); + self.rows.rows.sort_by(|x, y| { + x.cells[time_index] + .partial_cmp(&y.cells[time_index]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + info!("after insertion, expected rows: {}", self.rows); + Ok(()) + } + + fn find_column_index(&self, column_name: &str) -> Result { + self.columns + .iter() + .position(|col| col.name.to_string() == column_name) + .context(error::UnexpectedSnafu { + violated: format!("Column: {column_name} not found"), + }) + } + /// Applies the [AlterTableExpr]. - pub fn alter(mut self, expr: AlterTableExpr) -> Result { + pub fn alter(&mut self, expr: AlterTableExpr) -> Result<()> { match expr.alter_options { AlterTableOperation::AddColumn { column, location } => { ensure!( @@ -69,23 +295,18 @@ impl TableContext { ); match location { Some(AddColumnLocation::First) => { - let mut columns = Vec::with_capacity(self.columns.len() + 1); - columns.push(column); - columns.extend(self.columns); - self.columns = columns; + self.rows.add_column(0, &column); + self.columns.insert(0, column); } Some(AddColumnLocation::After { column_name }) => { - let index = self - .columns - .iter() - // TODO(weny): find a better way? - .position(|col| col.name.to_string() == column_name) - .context(error::UnexpectedSnafu { - violated: format!("Column: {column_name} not found"), - })?; + let index = self.find_column_index(&column_name)?; + self.rows.add_column(index + 1, &column); self.columns.insert(index + 1, column); } - None => self.columns.push(column), + None => { + self.rows.add_column(self.columns.len(), &column); + self.columns.push(column); + } } // Re-generates the primary_keys self.primary_keys = self @@ -100,10 +321,12 @@ impl TableContext { } }) .collect(); - Ok(self) } AlterTableOperation::DropColumn { name } => { - self.columns.retain(|col| col.name != name); + let at = self.find_column_index(&name.to_string())?; + self.columns.remove(at); + self.rows.drop_column(at); + // Re-generates the primary_keys self.primary_keys = self .columns @@ -117,7 +340,6 @@ impl TableContext { } }) .collect(); - Ok(self) } AlterTableOperation::RenameTable { new_table_name } => { ensure!( @@ -127,9 +349,9 @@ impl TableContext { } ); self.name = new_table_name; - Ok(self) } } + Ok(()) } pub fn generate_unique_column_name( @@ -162,16 +384,17 @@ mod tests { use common_query::AddColumnLocation; use datatypes::data_type::ConcreteDataType; - use super::TableContext; + use super::*; use crate::ir::alter_expr::AlterTableOperation; use crate::ir::create_expr::ColumnOption; use crate::ir::{AlterTableExpr, Column, Ident}; #[test] fn test_table_context_alter() { - let table_ctx = TableContext { + let mut table_ctx = TableContext { name: "foo".into(), columns: vec![], + rows: Rows::new(), partition: None, primary_keys: vec![], }; @@ -187,7 +410,7 @@ mod tests { location: None, }, }; - let table_ctx = table_ctx.alter(expr).unwrap(); + table_ctx.alter(expr).unwrap(); assert_eq!(table_ctx.columns[0].name, Ident::new("a")); assert_eq!(table_ctx.primary_keys, vec![0]); @@ -203,7 +426,7 @@ mod tests { location: Some(AddColumnLocation::First), }, }; - let table_ctx = table_ctx.alter(expr).unwrap(); + table_ctx.alter(expr).unwrap(); assert_eq!(table_ctx.columns[0].name, Ident::new("b")); assert_eq!(table_ctx.primary_keys, vec![0, 1]); @@ -221,7 +444,7 @@ mod tests { }), }, }; - let table_ctx = table_ctx.alter(expr).unwrap(); + table_ctx.alter(expr).unwrap(); assert_eq!(table_ctx.columns[1].name, Ident::new("c")); assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]); @@ -230,7 +453,7 @@ mod tests { table_name: "foo".into(), alter_options: AlterTableOperation::DropColumn { name: "b".into() }, }; - let table_ctx = table_ctx.alter(expr).unwrap(); + table_ctx.alter(expr).unwrap(); assert_eq!(table_ctx.columns[1].name, Ident::new("a")); assert_eq!(table_ctx.primary_keys, vec![0, 1]); } diff --git a/tests-fuzz/src/generator/insert_expr.rs b/tests-fuzz/src/generator/insert_expr.rs index ec4ba4f615..c6e0bc4f54 100644 --- a/tests-fuzz/src/generator/insert_expr.rs +++ b/tests-fuzz/src/generator/insert_expr.rs @@ -94,11 +94,7 @@ impl Generator for InsertExprGenerator { Ok(InsertIntoExpr { table_name: self.table_ctx.name.to_string(), - columns: if omit_column_list { - vec![] - } else { - values_columns - }, + columns: values_columns, values_list, }) } diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index cd5bb92907..03f097850c 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -15,9 +15,9 @@ //! The intermediate representation pub(crate) mod alter_expr; -pub(crate) mod create_expr; -pub(crate) mod insert_expr; -pub(crate) mod select_expr; +pub mod create_expr; +pub mod insert_expr; +pub mod select_expr; use core::fmt; diff --git a/tests-fuzz/src/ir/insert_expr.rs b/tests-fuzz/src/ir/insert_expr.rs index c7476c5170..8b4493c9d0 100644 --- a/tests-fuzz/src/ir/insert_expr.rs +++ b/tests-fuzz/src/ir/insert_expr.rs @@ -18,6 +18,7 @@ use datatypes::value::Value; use crate::ir::Column; +#[derive(Debug)] pub struct InsertIntoExpr { pub table_name: String, pub columns: Vec, @@ -26,6 +27,7 @@ pub struct InsertIntoExpr { pub type RowValues = Vec; +#[derive(Debug)] pub enum RowValue { Value(Value), Default, diff --git a/tests-fuzz/src/test_utils.rs b/tests-fuzz/src/test_utils.rs index e65548969a..382af37347 100644 --- a/tests-fuzz/src/test_utils.rs +++ b/tests-fuzz/src/test_utils.rs @@ -14,7 +14,7 @@ use datatypes::data_type::ConcreteDataType; -use crate::context::TableContext; +use crate::context::{Rows, TableContext}; use crate::ir::create_expr::ColumnOption; use crate::ir::Column; @@ -53,6 +53,7 @@ pub fn new_test_ctx() -> TableContext { options: vec![ColumnOption::TimeIndex], }, ], + rows: Rows::new(), partition: None, primary_keys: vec![], } diff --git a/tests-fuzz/targets/fuzz_alter_table.rs b/tests-fuzz/targets/fuzz_alter_table.rs index 10cc9115a1..f95e491d05 100644 --- a/tests-fuzz/targets/fuzz_alter_table.rs +++ b/tests-fuzz/targets/fuzz_alter_table.rs @@ -155,7 +155,9 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { .context(error::ExecuteQuerySnafu { sql: &sql })?; info!("Alter table: {sql}, result: {result:?}"); // Applies changes - table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).alter(expr).unwrap()); + let mut t = Arc::unwrap_or_clone(table_ctx); + t.alter(expr).unwrap(); + table_ctx = Arc::new(t); // Validates columns let mut column_entries = validator::column::fetch_columns_via_mysql(