From 98659899c0110b13a397549658db27cd3745decf Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 1 Mar 2023 19:46:39 +0800 Subject: [PATCH] refactor: Move mito engine tests to a separate file (#1104) * refactor(mito): Move tests to a separate file * chore(query): Remove empty mod function --- src/mito/src/engine.rs | 745 +--------------------------------- src/mito/src/engine/tests.rs | 754 +++++++++++++++++++++++++++++++++++ src/query/src/function.rs | 15 - src/query/src/lib.rs | 1 - 4 files changed, 756 insertions(+), 759 deletions(-) create mode 100644 src/mito/src/engine/tests.rs delete mode 100644 src/query/src/function.rs diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index a095db1c77..9ef5154f2e 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -13,6 +13,8 @@ // limitations under the License. mod procedure; +#[cfg(test)] +mod tests; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -633,746 +635,3 @@ impl MitoEngineInner { } } } - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_query::physical_plan::SessionContext; - use common_recordbatch::util; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; - use datatypes::value::Value; - use datatypes::vectors::{ - Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, - }; - use log_store::NoopLogStore; - use storage::compaction::noop::NoopCompactionScheduler; - use storage::config::EngineConfig as StorageEngineConfig; - use storage::region::RegionImpl; - use storage::EngineImpl; - use store_api::manifest::Manifest; - use store_api::storage::ReadContext; - use table::requests::{AddColumnRequest, AlterKind, DeleteRequest, TableOptions}; - use tempdir::TempDir; - - use super::*; - use crate::table::test_util; - use crate::table::test_util::{ - new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME, - }; - - async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { - let table_name = "test_default_constraint"; - let column_schemas = vec![ - ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))) - .unwrap(), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), - true, - ) - .with_time_index(true), - ]; - - let schema = RawSchema::new(column_schemas); - - let (dir, object_store) = - test_util::new_test_object_store("test_insert_with_column_default_constraint").await; - let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - let table_engine = MitoEngine::new( - EngineConfig::default(), - EngineImpl::new( - StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), - object_store.clone(), - compaction_scheduler, - ), - object_store, - ); - - let table = table_engine - .create_table( - &EngineContext::default(), - CreateTableRequest { - id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - desc: Some("a test table".to_string()), - schema, - create_if_not_exists: true, - primary_key_indices: Vec::default(), - table_options: TableOptions::default(), - region_numbers: vec![0], - }, - ) - .await - .unwrap(); - - (dir, table_name.to_string(), table) - } - - #[tokio::test] - async fn test_column_default_constraint() { - let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; - - let mut columns_values: HashMap = HashMap::with_capacity(4); - let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); - let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - - columns_values.insert("name".to_string(), names.clone()); - columns_values.insert("ts".to_string(), tss.clone()); - - let insert_req = new_insert_request(table_name.to_string(), columns_values); - assert_eq!(2, table.insert(insert_req).await.unwrap()); - - let session_ctx = SessionContext::new(); - let stream = table.scan(None, &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - assert_eq!(1, batches.len()); - - let record = &batches[0]; - assert_eq!(record.num_columns(), 3); - assert_eq!(names, *record.column(0)); - assert_eq!( - Arc::new(Int32Vector::from_vec(vec![42, 42])) as VectorRef, - *record.column(1) - ); - assert_eq!(tss, *record.column(2)); - } - - #[tokio::test] - async fn test_insert_with_column_default_constraint() { - let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; - - let mut columns_values: HashMap = HashMap::with_capacity(4); - let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); - let nums: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(66)])); - let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - - columns_values.insert("name".to_string(), names.clone()); - columns_values.insert("n".to_string(), nums.clone()); - columns_values.insert("ts".to_string(), tss.clone()); - - let insert_req = new_insert_request(table_name.to_string(), columns_values); - assert_eq!(2, table.insert(insert_req).await.unwrap()); - - let session_ctx = SessionContext::new(); - let stream = table.scan(None, &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - assert_eq!(1, batches.len()); - - let record = &batches[0]; - assert_eq!(record.num_columns(), 3); - assert_eq!(names, *record.column(0)); - assert_eq!(nums, *record.column(1)); - assert_eq!(tss, *record.column(2)); - } - - #[test] - fn test_region_name() { - assert_eq!("1_0000000000", region_name(1, 0)); - assert_eq!("1_0000000001", region_name(1, 1)); - assert_eq!("99_0000000100", region_name(99, 100)); - assert_eq!("1000_0000009999", region_name(1000, 9999)); - } - - #[test] - fn test_table_dir() { - assert_eq!( - "greptime/public/1024/", - table_dir("greptime", "public", 1024) - ); - assert_eq!( - "0x4354a1/prometheus/1024/", - table_dir("0x4354a1", "prometheus", 1024) - ); - } - - #[test] - fn test_validate_create_table_request() { - let table_name = "test_validate_create_table_request"; - let column_schemas = vec![ - ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), - true, - ) - .with_time_index(true), - ]; - - let schema = RawSchema::new(column_schemas); - - let mut request = CreateTableRequest { - id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - desc: Some("a test table".to_string()), - schema, - create_if_not_exists: true, - // put ts into primary keys - primary_key_indices: vec![0, 1], - table_options: TableOptions::default(), - region_numbers: vec![0], - }; - - let err = validate_create_table_request(&request).unwrap_err(); - assert!(err - .to_string() - .contains("Invalid primary key: time index column can't be included in primary key")); - - request.primary_key_indices = vec![0]; - assert!(validate_create_table_request(&request).is_ok()); - } - - #[tokio::test] - async fn test_create_table_insert_scan() { - let TestEngineComponents { - table_ref: table, - schema_ref, - dir: _dir, - .. - } = test_util::setup_test_engine_and_table().await; - assert_eq!(TableType::Base, table.table_type()); - assert_eq!(schema_ref, table.schema()); - - let insert_req = new_insert_request("demo".to_string(), HashMap::default()); - assert_eq!(0, table.insert(insert_req).await.unwrap()); - - let mut columns_values: HashMap = HashMap::with_capacity(4); - let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2"])); - let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])); - let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])); - let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - - columns_values.insert("host".to_string(), hosts.clone()); - columns_values.insert("cpu".to_string(), cpus.clone()); - columns_values.insert("memory".to_string(), memories.clone()); - columns_values.insert("ts".to_string(), tss.clone()); - - let insert_req = new_insert_request("demo".to_string(), columns_values); - assert_eq!(2, table.insert(insert_req).await.unwrap()); - - let session_ctx = SessionContext::new(); - let stream = table.scan(None, &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - assert_eq!(1, batches.len()); - assert_eq!(batches[0].num_columns(), 4); - - let batch_schema = &batches[0].schema; - assert_eq!(batch_schema.num_columns(), 4); - assert_eq!(batch_schema.column_schemas()[0].name, "host"); - assert_eq!(batch_schema.column_schemas()[1].name, "cpu"); - assert_eq!(batch_schema.column_schemas()[2].name, "memory"); - assert_eq!(batch_schema.column_schemas()[3].name, "ts"); - - let batch = &batches[0]; - assert_eq!(4, batch.num_columns()); - assert_eq!(hosts, *batch.column(0)); - assert_eq!(cpus, *batch.column(1)); - assert_eq!(memories, *batch.column(2)); - assert_eq!(tss, *batch.column(3)); - - // Scan with projections: cpu and memory - let stream = table.scan(Some(&vec![1, 2]), &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - assert_eq!(1, batches.len()); - assert_eq!(batches[0].num_columns(), 2); - - let batch_schema = &batches[0].schema; - assert_eq!(batch_schema.num_columns(), 2); - - assert_eq!(batch_schema.column_schemas()[0].name, "cpu"); - assert_eq!(batch_schema.column_schemas()[1].name, "memory"); - - let batch = &batches[0]; - assert_eq!(2, batch.num_columns()); - assert_eq!(cpus, *batch.column(0)); - assert_eq!(memories, *batch.column(1)); - - // Scan with projections: only ts - let stream = table.scan(Some(&vec![3]), &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - assert_eq!(1, batches.len()); - assert_eq!(batches[0].num_columns(), 1); - - let batch_schema = &batches[0].schema; - assert_eq!(batch_schema.num_columns(), 1); - - assert_eq!(batch_schema.column_schemas()[0].name, "ts"); - - let record = &batches[0]; - assert_eq!(1, record.num_columns()); - assert_eq!(tss, *record.column(0)); - } - - #[tokio::test] - async fn test_create_table_scan_batches() { - common_telemetry::init_default_ut_logging(); - - let TestEngineComponents { - table_ref: table, - dir: _dir, - .. - } = test_util::setup_test_engine_and_table().await; - - // TODO(yingwen): Custom batch size once the table support setting batch_size. - let default_batch_size = ReadContext::default().batch_size; - // Insert more than batch size rows to the table. - let test_batch_size = default_batch_size * 4; - let mut columns_values: HashMap = HashMap::with_capacity(4); - let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1"; test_batch_size])); - let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5; test_batch_size])); - let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64; test_batch_size])); - let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_values( - (0..test_batch_size).map(|v| v as i64), - )); - - columns_values.insert("host".to_string(), hosts); - columns_values.insert("cpu".to_string(), cpus); - columns_values.insert("memory".to_string(), memories); - columns_values.insert("ts".to_string(), tss.clone()); - - let insert_req = new_insert_request("demo".to_string(), columns_values); - assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); - - let session_ctx = SessionContext::new(); - let stream = table.scan(None, &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect(stream).await.unwrap(); - let mut total = 0; - for batch in batches { - assert_eq!(batch.num_columns(), 4); - let ts = batch.column(3); - let expect = tss.slice(total, ts.len()); - assert_eq!(expect, *ts); - total += ts.len(); - } - assert_eq!(test_batch_size, total); - } - - #[tokio::test] - async fn test_create_if_not_exists() { - common_telemetry::init_default_ut_logging(); - let ctx = EngineContext::default(); - - let (_engine, table_engine, table, _object_store, _dir) = - test_util::setup_mock_engine_and_table().await; - - let table_info = table.table_info(); - - let request = CreateTableRequest { - id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_info.name.to_string(), - schema: RawSchema::from(&*table_info.meta.schema), - create_if_not_exists: true, - desc: None, - primary_key_indices: Vec::default(), - table_options: TableOptions::default(), - region_numbers: vec![0], - }; - - let created_table = table_engine.create_table(&ctx, request).await.unwrap(); - assert_eq!(table_info, created_table.table_info()); - - // test create_if_not_exists=false - let request = CreateTableRequest { - id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_info.name.to_string(), - schema: RawSchema::from(&*table_info.meta.schema), - create_if_not_exists: false, - desc: None, - primary_key_indices: Vec::default(), - table_options: TableOptions::default(), - region_numbers: vec![0], - }; - - let result = table_engine.create_table(&ctx, request).await; - - assert!(result.is_err()); - assert!(matches!(result, Err(e) if format!("{e:?}").contains("Table already exists"))); - } - - #[tokio::test] - async fn test_open_table() { - common_telemetry::init_default_ut_logging(); - - let ctx = EngineContext::default(); - let open_req = OpenTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: test_util::TABLE_NAME.to_string(), - // the test table id is 1 - table_id: 1, - }; - - let (_engine, storage_engine, table, object_store, _dir) = { - let TestEngineComponents { - table_engine, - storage_engine, - table_ref: table, - object_store, - dir, - .. - } = test_util::setup_test_engine_and_table().await; - - assert_eq!(MITO_ENGINE, table_engine.name()); - // Now try to open the table again. - let reopened = table_engine - .open_table(&ctx, open_req.clone()) - .await - .unwrap() - .unwrap(); - assert_eq!(table.schema(), reopened.schema()); - - (table_engine, storage_engine, table, object_store, dir) - }; - - // Construct a new table engine, and try to open the table. - let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); - let reopened = table_engine - .open_table(&ctx, open_req.clone()) - .await - .unwrap() - .unwrap(); - assert_eq!(table.schema(), reopened.schema()); - - let reopened = reopened - .as_any() - .downcast_ref::>>() - .unwrap(); - - let left = table.table_info(); - // assert recovered table_info is correct - let right = reopened.table_info(); - assert_eq!(left, right); - assert_eq!(reopened.manifest().last_version(), 1); - } - - #[test] - fn test_region_id() { - assert_eq!(1, region_id(0, 1)); - assert_eq!(4294967296, region_id(1, 0)); - assert_eq!(4294967297, region_id(1, 1)); - assert_eq!(4294967396, region_id(1, 100)); - assert_eq!(8589934602, region_id(2, 10)); - assert_eq!(18446744069414584330, region_id(u32::MAX, 10)); - } - - fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest { - AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::AddColumns { - columns: vec![ - AddColumnRequest { - column_schema: new_tag.clone(), - is_key: true, - }, - AddColumnRequest { - column_schema: new_field.clone(), - is_key: false, - }, - ], - }, - } - } - - #[tokio::test] - async fn test_alter_table_add_column() { - let (_engine, table_engine, table, _object_store, _dir) = - test_util::setup_mock_engine_and_table().await; - - let old_info = table.table_info(); - let old_meta = &old_info.meta; - let old_schema = &old_meta.schema; - - let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); - let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); - let req = new_add_columns_req(&new_tag, &new_field); - let table = table_engine - .alter_table(&EngineContext::default(), req) - .await - .unwrap(); - - let new_info = table.table_info(); - let new_meta = &new_info.meta; - let new_schema = &new_meta.schema; - - assert_eq!(&[0, 4], &new_meta.primary_key_indices[..]); - assert_eq!(&[1, 2, 3, 5], &new_meta.value_indices[..]); - assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 2); - assert_eq!( - &new_schema.column_schemas()[..old_schema.num_columns()], - old_schema.column_schemas() - ); - assert_eq!( - &new_schema.column_schemas()[old_schema.num_columns()], - &new_tag - ); - assert_eq!( - &new_schema.column_schemas()[old_schema.num_columns() + 1], - &new_field - ); - assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); - assert_eq!(new_schema.version(), old_schema.version() + 1); - assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2); - } - - #[tokio::test] - async fn test_alter_table_remove_column() { - let (_engine, table_engine, _table, _object_store, _dir) = - test_util::setup_mock_engine_and_table().await; - - // Add two columns to the table first. - let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); - let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); - let req = new_add_columns_req(&new_tag, &new_field); - let table = table_engine - .alter_table(&EngineContext::default(), req) - .await - .unwrap(); - - let old_info = table.table_info(); - let old_meta = &old_info.meta; - let old_schema = &old_meta.schema; - - // Then remove memory and my_field from the table. - let req = AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::DropColumns { - names: vec![String::from("memory"), String::from("my_field")], - }, - }; - let table = table_engine - .alter_table(&EngineContext::default(), req) - .await - .unwrap(); - - let new_info = table.table_info(); - let new_meta = &new_info.meta; - let new_schema = &new_meta.schema; - - assert_eq!(new_schema.num_columns(), old_schema.num_columns() - 2); - let remaining_names: Vec = new_schema - .column_schemas() - .iter() - .map(|column_schema| column_schema.name.clone()) - .collect(); - assert_eq!(&["host", "cpu", "ts", "my_tag"], &remaining_names[..]); - assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]); - assert_eq!(&[1, 2], &new_meta.value_indices[..]); - assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); - assert_eq!(new_schema.version(), old_schema.version() + 1); - } - - #[tokio::test] - async fn test_alter_rename_table() { - let TestEngineComponents { - table_engine, - storage_engine, - object_store, - dir: _dir, - .. - } = test_util::setup_test_engine_and_table().await; - let ctx = EngineContext::default(); - - // register another table - let another_name = "another_table"; - let req = CreateTableRequest { - id: 1024, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: another_name.to_string(), - desc: Some("another test table".to_string()), - schema: RawSchema::from(&schema_for_test()), - region_numbers: vec![0], - primary_key_indices: vec![0], - create_if_not_exists: true, - table_options: TableOptions::default(), - }; - table_engine - .create_table(&ctx, req) - .await - .expect("create table must succeed"); - // test renaming a table with an existing name. - let req = AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::RenameTable { - new_table_name: another_name.to_string(), - }, - }; - let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); - assert!( - err.to_string().contains("Table already exists"), - "Unexpected error: {err}" - ); - - let new_table_name = "test_table"; - // test rename table - let req = AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::RenameTable { - new_table_name: new_table_name.to_string(), - }, - }; - let table = table_engine.alter_table(&ctx, req).await.unwrap(); - - assert_eq!(table.table_info().name, new_table_name); - - let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); - let open_req = OpenTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: new_table_name.to_string(), - table_id: 1, - }; - - // test reopen table - let reopened = table_engine - .open_table(&ctx, open_req.clone()) - .await - .unwrap() - .unwrap(); - let reopened = reopened - .as_any() - .downcast_ref::>>() - .unwrap(); - assert_eq!(reopened.table_info(), table.table_info()); - assert_eq!(reopened.table_info().name, new_table_name); - assert_eq!(reopened.manifest().last_version(), 2); - } - - #[tokio::test] - async fn test_drop_table() { - common_telemetry::init_default_ut_logging(); - let ctx = EngineContext::default(); - - let (_engine, table_engine, table, _object_store, _dir) = - test_util::setup_mock_engine_and_table().await; - let engine_ctx = EngineContext {}; - - let table_info = table.table_info(); - let table_reference = TableReference { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - table: &table_info.name, - }; - - let create_table_request = CreateTableRequest { - id: 1, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_info.name.to_string(), - schema: RawSchema::from(&*table_info.meta.schema), - create_if_not_exists: true, - desc: None, - primary_key_indices: Vec::default(), - table_options: TableOptions::default(), - region_numbers: vec![0], - }; - - let created_table = table_engine - .create_table(&ctx, create_table_request) - .await - .unwrap(); - assert_eq!(table_info, created_table.table_info()); - assert!(table_engine.table_exists(&engine_ctx, &table_reference)); - - let drop_table_request = DropTableRequest { - catalog_name: table_reference.catalog.to_string(), - schema_name: table_reference.schema.to_string(), - table_name: table_reference.table.to_string(), - }; - let table_dropped = table_engine - .drop_table(&engine_ctx, drop_table_request) - .await - .unwrap(); - assert!(table_dropped); - assert!(!table_engine.table_exists(&engine_ctx, &table_reference)); - - // should be able to re-create - let request = CreateTableRequest { - id: 2, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_info.name.to_string(), - schema: RawSchema::from(&*table_info.meta.schema), - create_if_not_exists: false, - desc: None, - primary_key_indices: Vec::default(), - table_options: TableOptions::default(), - region_numbers: vec![0], - }; - table_engine.create_table(&ctx, request).await.unwrap(); - assert!(table_engine.table_exists(&engine_ctx, &table_reference)); - } - - #[tokio::test] - async fn test_table_delete_rows() { - let TestEngineComponents { - table_ref: table, - dir: _dir, - .. - } = test_util::setup_test_engine_and_table().await; - - let mut columns_values: HashMap = HashMap::with_capacity(4); - let hosts: VectorRef = - Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"])); - let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); - let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); - let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1])); - - columns_values.insert("host".to_string(), hosts.clone()); - columns_values.insert("cpu".to_string(), cpus.clone()); - columns_values.insert("memory".to_string(), memories.clone()); - columns_values.insert("ts".to_string(), tss.clone()); - - let insert_req = new_insert_request("demo".to_string(), columns_values); - assert_eq!(4, table.insert(insert_req).await.unwrap()); - - let del_hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host3"])); - let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - let mut key_column_values = HashMap::with_capacity(2); - key_column_values.insert("host".to_string(), del_hosts); - key_column_values.insert("ts".to_string(), del_tss); - let del_req = DeleteRequest { key_column_values }; - table.delete(del_req).await.unwrap(); - - let session_ctx = SessionContext::new(); - let stream = table.scan(None, &[], None).await.unwrap(); - let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); - let batches = util::collect_batches(stream).await.unwrap(); - - assert_eq!( - batches.pretty_print().unwrap(), - "\ -+-------+-----+--------+-------------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+-------------------------+ -| host2 | 2.0 | 2.0 | 1970-01-01T00:00:00.002 | -| host4 | 4.0 | 4.0 | 1970-01-01T00:00:00.001 | -+-------+-----+--------+-------------------------+" - ); - } -} diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs new file mode 100644 index 0000000000..0758f36b83 --- /dev/null +++ b/src/mito/src/engine/tests.rs @@ -0,0 +1,754 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for mito table engine. + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::physical_plan::SessionContext; +use common_recordbatch::util; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; +use datatypes::value::Value; +use datatypes::vectors::{ + Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, +}; +use log_store::NoopLogStore; +use storage::compaction::noop::NoopCompactionScheduler; +use storage::config::EngineConfig as StorageEngineConfig; +use storage::region::RegionImpl; +use storage::EngineImpl; +use store_api::manifest::Manifest; +use store_api::storage::ReadContext; +use table::requests::{AddColumnRequest, AlterKind, DeleteRequest, TableOptions}; +use tempdir::TempDir; + +use super::*; +use crate::table::test_util; +use crate::table::test_util::{ + new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME, +}; + +async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { + let table_name = "test_default_constraint"; + let column_schemas = vec![ + ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))) + .unwrap(), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), + true, + ) + .with_time_index(true), + ]; + + let schema = RawSchema::new(column_schemas); + + let (dir, object_store) = + test_util::new_test_object_store("test_insert_with_column_default_constraint").await; + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let table_engine = MitoEngine::new( + EngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + compaction_scheduler, + ), + object_store, + ); + + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema, + create_if_not_exists: true, + primary_key_indices: Vec::default(), + table_options: TableOptions::default(), + region_numbers: vec![0], + }, + ) + .await + .unwrap(); + + (dir, table_name.to_string(), table) +} + +#[tokio::test] +async fn test_column_default_constraint() { + let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); + + columns_values.insert("name".to_string(), names.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request(table_name.to_string(), columns_values); + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + + let record = &batches[0]; + assert_eq!(record.num_columns(), 3); + assert_eq!(names, *record.column(0)); + assert_eq!( + Arc::new(Int32Vector::from_vec(vec![42, 42])) as VectorRef, + *record.column(1) + ); + assert_eq!(tss, *record.column(2)); +} + +#[tokio::test] +async fn test_insert_with_column_default_constraint() { + let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); + let nums: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(66)])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); + + columns_values.insert("name".to_string(), names.clone()); + columns_values.insert("n".to_string(), nums.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request(table_name.to_string(), columns_values); + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + + let record = &batches[0]; + assert_eq!(record.num_columns(), 3); + assert_eq!(names, *record.column(0)); + assert_eq!(nums, *record.column(1)); + assert_eq!(tss, *record.column(2)); +} + +#[test] +fn test_region_name() { + assert_eq!("1_0000000000", region_name(1, 0)); + assert_eq!("1_0000000001", region_name(1, 1)); + assert_eq!("99_0000000100", region_name(99, 100)); + assert_eq!("1000_0000009999", region_name(1000, 9999)); +} + +#[test] +fn test_table_dir() { + assert_eq!( + "greptime/public/1024/", + table_dir("greptime", "public", 1024) + ); + assert_eq!( + "0x4354a1/prometheus/1024/", + table_dir("0x4354a1", "prometheus", 1024) + ); +} + +#[test] +fn test_validate_create_table_request() { + let table_name = "test_validate_create_table_request"; + let column_schemas = vec![ + ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), + true, + ) + .with_time_index(true), + ]; + + let schema = RawSchema::new(column_schemas); + + let mut request = CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema, + create_if_not_exists: true, + // put ts into primary keys + primary_key_indices: vec![0, 1], + table_options: TableOptions::default(), + region_numbers: vec![0], + }; + + let err = validate_create_table_request(&request).unwrap_err(); + assert!(err + .to_string() + .contains("Invalid primary key: time index column can't be included in primary key")); + + request.primary_key_indices = vec![0]; + assert!(validate_create_table_request(&request).is_ok()); +} + +#[tokio::test] +async fn test_create_table_insert_scan() { + let TestEngineComponents { + table_ref: table, + schema_ref, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; + assert_eq!(TableType::Base, table.table_type()); + assert_eq!(schema_ref, table.schema()); + + let insert_req = new_insert_request("demo".to_string(), HashMap::default()); + assert_eq!(0, table.insert(insert_req).await.unwrap()); + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); + + columns_values.insert("host".to_string(), hosts.clone()); + columns_values.insert("cpu".to_string(), cpus.clone()); + columns_values.insert("memory".to_string(), memories.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].num_columns(), 4); + + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 4); + assert_eq!(batch_schema.column_schemas()[0].name, "host"); + assert_eq!(batch_schema.column_schemas()[1].name, "cpu"); + assert_eq!(batch_schema.column_schemas()[2].name, "memory"); + assert_eq!(batch_schema.column_schemas()[3].name, "ts"); + + let batch = &batches[0]; + assert_eq!(4, batch.num_columns()); + assert_eq!(hosts, *batch.column(0)); + assert_eq!(cpus, *batch.column(1)); + assert_eq!(memories, *batch.column(2)); + assert_eq!(tss, *batch.column(3)); + + // Scan with projections: cpu and memory + let stream = table.scan(Some(&vec![1, 2]), &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].num_columns(), 2); + + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 2); + + assert_eq!(batch_schema.column_schemas()[0].name, "cpu"); + assert_eq!(batch_schema.column_schemas()[1].name, "memory"); + + let batch = &batches[0]; + assert_eq!(2, batch.num_columns()); + assert_eq!(cpus, *batch.column(0)); + assert_eq!(memories, *batch.column(1)); + + // Scan with projections: only ts + let stream = table.scan(Some(&vec![3]), &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].num_columns(), 1); + + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 1); + + assert_eq!(batch_schema.column_schemas()[0].name, "ts"); + + let record = &batches[0]; + assert_eq!(1, record.num_columns()); + assert_eq!(tss, *record.column(0)); +} + +#[tokio::test] +async fn test_create_table_scan_batches() { + common_telemetry::init_default_ut_logging(); + + let TestEngineComponents { + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; + + // TODO(yingwen): Custom batch size once the table support setting batch_size. + let default_batch_size = ReadContext::default().batch_size; + // Insert more than batch size rows to the table. + let test_batch_size = default_batch_size * 4; + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1"; test_batch_size])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5; test_batch_size])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64; test_batch_size])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_values( + (0..test_batch_size).map(|v| v as i64), + )); + + columns_values.insert("host".to_string(), hosts); + columns_values.insert("cpu".to_string(), cpus); + columns_values.insert("memory".to_string(), memories); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect(stream).await.unwrap(); + let mut total = 0; + for batch in batches { + assert_eq!(batch.num_columns(), 4); + let ts = batch.column(3); + let expect = tss.slice(total, ts.len()); + assert_eq!(expect, *ts); + total += ts.len(); + } + assert_eq!(test_batch_size, total); +} + +#[tokio::test] +async fn test_create_if_not_exists() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + let table_info = table.table_info(); + + let request = CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_info.name.to_string(), + schema: RawSchema::from(&*table_info.meta.schema), + create_if_not_exists: true, + desc: None, + primary_key_indices: Vec::default(), + table_options: TableOptions::default(), + region_numbers: vec![0], + }; + + let created_table = table_engine.create_table(&ctx, request).await.unwrap(); + assert_eq!(table_info, created_table.table_info()); + + // test create_if_not_exists=false + let request = CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_info.name.to_string(), + schema: RawSchema::from(&*table_info.meta.schema), + create_if_not_exists: false, + desc: None, + primary_key_indices: Vec::default(), + table_options: TableOptions::default(), + region_numbers: vec![0], + }; + + let result = table_engine.create_table(&ctx, request).await; + + assert!(result.is_err()); + assert!(matches!(result, Err(e) if format!("{e:?}").contains("Table already exists"))); +} + +#[tokio::test] +async fn test_open_table() { + common_telemetry::init_default_ut_logging(); + + let ctx = EngineContext::default(); + let open_req = OpenTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: test_util::TABLE_NAME.to_string(), + // the test table id is 1 + table_id: 1, + }; + + let (_engine, storage_engine, table, object_store, _dir) = { + let TestEngineComponents { + table_engine, + storage_engine, + table_ref: table, + object_store, + dir, + .. + } = test_util::setup_test_engine_and_table().await; + + assert_eq!(MITO_ENGINE, table_engine.name()); + // Now try to open the table again. + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); + + (table_engine, storage_engine, table, object_store, dir) + }; + + // Construct a new table engine, and try to open the table. + let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); + + let reopened = reopened + .as_any() + .downcast_ref::>>() + .unwrap(); + + let left = table.table_info(); + // assert recovered table_info is correct + let right = reopened.table_info(); + assert_eq!(left, right); + assert_eq!(reopened.manifest().last_version(), 1); +} + +#[test] +fn test_region_id() { + assert_eq!(1, region_id(0, 1)); + assert_eq!(4294967296, region_id(1, 0)); + assert_eq!(4294967297, region_id(1, 1)); + assert_eq!(4294967396, region_id(1, 100)); + assert_eq!(8589934602, region_id(2, 10)); + assert_eq!(18446744069414584330, region_id(u32::MAX, 10)); +} + +fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest { + AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::AddColumns { + columns: vec![ + AddColumnRequest { + column_schema: new_tag.clone(), + is_key: true, + }, + AddColumnRequest { + column_schema: new_field.clone(), + is_key: false, + }, + ], + }, + } +} + +#[tokio::test] +async fn test_alter_table_add_column() { + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + let old_info = table.table_info(); + let old_meta = &old_info.meta; + let old_schema = &old_meta.schema; + + let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); + let req = new_add_columns_req(&new_tag, &new_field); + let table = table_engine + .alter_table(&EngineContext::default(), req) + .await + .unwrap(); + + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(&[0, 4], &new_meta.primary_key_indices[..]); + assert_eq!(&[1, 2, 3, 5], &new_meta.value_indices[..]); + assert_eq!(new_schema.num_columns(), old_schema.num_columns() + 2); + assert_eq!( + &new_schema.column_schemas()[..old_schema.num_columns()], + old_schema.column_schemas() + ); + assert_eq!( + &new_schema.column_schemas()[old_schema.num_columns()], + &new_tag + ); + assert_eq!( + &new_schema.column_schemas()[old_schema.num_columns() + 1], + &new_field + ); + assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); + assert_eq!(new_schema.version(), old_schema.version() + 1); + assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2); +} + +#[tokio::test] +async fn test_alter_table_remove_column() { + let (_engine, table_engine, _table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + // Add two columns to the table first. + let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); + let req = new_add_columns_req(&new_tag, &new_field); + let table = table_engine + .alter_table(&EngineContext::default(), req) + .await + .unwrap(); + + let old_info = table.table_info(); + let old_meta = &old_info.meta; + let old_schema = &old_meta.schema; + + // Then remove memory and my_field from the table. + let req = AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::DropColumns { + names: vec![String::from("memory"), String::from("my_field")], + }, + }; + let table = table_engine + .alter_table(&EngineContext::default(), req) + .await + .unwrap(); + + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(new_schema.num_columns(), old_schema.num_columns() - 2); + let remaining_names: Vec = new_schema + .column_schemas() + .iter() + .map(|column_schema| column_schema.name.clone()) + .collect(); + assert_eq!(&["host", "cpu", "ts", "my_tag"], &remaining_names[..]); + assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]); + assert_eq!(&[1, 2], &new_meta.value_indices[..]); + assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column()); + assert_eq!(new_schema.version(), old_schema.version() + 1); +} + +#[tokio::test] +async fn test_alter_rename_table() { + let TestEngineComponents { + table_engine, + storage_engine, + object_store, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; + let ctx = EngineContext::default(); + + // register another table + let another_name = "another_table"; + let req = CreateTableRequest { + id: 1024, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: another_name.to_string(), + desc: Some("another test table".to_string()), + schema: RawSchema::from(&schema_for_test()), + region_numbers: vec![0], + primary_key_indices: vec![0], + create_if_not_exists: true, + table_options: TableOptions::default(), + }; + table_engine + .create_table(&ctx, req) + .await + .expect("create table must succeed"); + // test renaming a table with an existing name. + let req = AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: another_name.to_string(), + }, + }; + let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); + assert!( + err.to_string().contains("Table already exists"), + "Unexpected error: {err}" + ); + + let new_table_name = "test_table"; + // test rename table + let req = AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: new_table_name.to_string(), + }, + }; + let table = table_engine.alter_table(&ctx, req).await.unwrap(); + + assert_eq!(table.table_info().name, new_table_name); + + let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); + let open_req = OpenTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: new_table_name.to_string(), + table_id: 1, + }; + + // test reopen table + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + let reopened = reopened + .as_any() + .downcast_ref::>>() + .unwrap(); + assert_eq!(reopened.table_info(), table.table_info()); + assert_eq!(reopened.table_info().name, new_table_name); + assert_eq!(reopened.manifest().last_version(), 2); +} + +#[tokio::test] +async fn test_drop_table() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + let engine_ctx = EngineContext {}; + + let table_info = table.table_info(); + let table_reference = TableReference { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: &table_info.name, + }; + + let create_table_request = CreateTableRequest { + id: 1, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_info.name.to_string(), + schema: RawSchema::from(&*table_info.meta.schema), + create_if_not_exists: true, + desc: None, + primary_key_indices: Vec::default(), + table_options: TableOptions::default(), + region_numbers: vec![0], + }; + + let created_table = table_engine + .create_table(&ctx, create_table_request) + .await + .unwrap(); + assert_eq!(table_info, created_table.table_info()); + assert!(table_engine.table_exists(&engine_ctx, &table_reference)); + + let drop_table_request = DropTableRequest { + catalog_name: table_reference.catalog.to_string(), + schema_name: table_reference.schema.to_string(), + table_name: table_reference.table.to_string(), + }; + let table_dropped = table_engine + .drop_table(&engine_ctx, drop_table_request) + .await + .unwrap(); + assert!(table_dropped); + assert!(!table_engine.table_exists(&engine_ctx, &table_reference)); + + // should be able to re-create + let request = CreateTableRequest { + id: 2, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_info.name.to_string(), + schema: RawSchema::from(&*table_info.meta.schema), + create_if_not_exists: false, + desc: None, + primary_key_indices: Vec::default(), + table_options: TableOptions::default(), + region_numbers: vec![0], + }; + table_engine.create_table(&ctx, request).await.unwrap(); + assert!(table_engine.table_exists(&engine_ctx, &table_reference)); +} + +#[tokio::test] +async fn test_table_delete_rows() { + let TestEngineComponents { + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1])); + + columns_values.insert("host".to_string(), hosts.clone()); + columns_values.insert("cpu".to_string(), cpus.clone()); + columns_values.insert("memory".to_string(), memories.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(4, table.insert(insert_req).await.unwrap()); + + let del_hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host3"])); + let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); + let mut key_column_values = HashMap::with_capacity(2); + key_column_values.insert("host".to_string(), del_hosts); + key_column_values.insert("ts".to_string(), del_tss); + let del_req = DeleteRequest { key_column_values }; + table.delete(del_req).await.unwrap(); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect_batches(stream).await.unwrap(); + + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+-----+--------+-------------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+-------------------------+ +| host2 | 2.0 | 2.0 | 1970-01-01T00:00:00.002 | +| host4 | 4.0 | 4.0 | 1970-01-01T00:00:00.001 | ++-------+-----+--------+-------------------------+" + ); +} diff --git a/src/query/src/function.rs b/src/query/src/function.rs deleted file mode 100644 index 1f142ee547..0000000000 --- a/src/query/src/function.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! GreptimeDB builtin functions diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 689e36b1a8..6d7775614f 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -15,7 +15,6 @@ pub mod datafusion; pub mod error; pub mod executor; -mod function; pub mod logical_optimizer; mod metric; mod optimizer;