diff --git a/Cargo.lock b/Cargo.lock index 9013fb86ad..c883770ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3081,8 +3081,6 @@ dependencies = [ "prost", "query", "regex", - "rstest", - "rstest_reuse", "script", "serde", "serde_json", @@ -8764,31 +8762,49 @@ dependencies = [ "axum-test-helper", "catalog", "client", + "common-base", "common-catalog", "common-error", "common-grpc", "common-query", + "common-recordbatch", "common-runtime", "common-telemetry", "common-test-util", + "datafusion", + "datafusion-expr", "datanode", "datatypes", "dotenv", "frontend", + "futures", + "itertools", + "meta-client", + "meta-srv", "mito", "object-store", "once_cell", + "partition", "paste", + "prost", + "query", "rand", + "rstest", + "rstest_reuse", + "script", "secrecy", "serde", "serde_json", "servers", + "session", "snafu", "sql", + "store-api", "table", "tempfile", "tokio", + "tonic 0.9.2", + "tower", "uuid", ] diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 869f514e22..03f402289d 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [features] default = ["python"] python = ["dep:script"] +testing = [] [dependencies] api = { path = "../api" } @@ -66,8 +67,6 @@ common-test-util = { path = "../common/test-util" } datanode = { path = "../datanode" } futures = "0.3" meta-srv = { path = "../meta-srv", features = ["mock"] } -rstest = "0.17" -rstest_reuse = "0.5" strfmt = "0.2" toml = "0.5" tower = "0.4" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index ab31162a5c..d87a0dc919 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -75,19 +75,19 @@ impl FrontendCatalogManager { } } - pub(crate) fn set_dist_instance(&mut self, dist_instance: Arc) { + pub fn set_dist_instance(&mut self, dist_instance: Arc) { self.dist_instance = Some(dist_instance) } - pub(crate) fn backend(&self) -> KvBackendRef { + pub fn backend(&self) -> KvBackendRef { self.backend.clone() } - pub(crate) fn partition_manager(&self) -> PartitionRuleManagerRef { + pub fn partition_manager(&self) -> PartitionRuleManagerRef { self.partition_manager.clone() } - pub(crate) fn datanode_clients(&self) -> Arc { + pub fn datanode_clients(&self) -> Arc { self.datanode_clients.clone() } } @@ -406,71 +406,3 @@ impl SchemaProvider for FrontendSchemaProvider { Ok(self.table_names().await?.contains(&name.to_string())) } } - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME}; - use table::requests::{CreateTableRequest, TableOptions}; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_register_system_table() { - let instance = - crate::tests::create_distributed_instance("test_register_system_table").await; - - let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = DEFAULT_SCHEMA_NAME; - let table_name = SCRIPTS_TABLE_NAME; - let request = CreateTableRequest { - id: 1, - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - desc: Some("Scripts table".to_string()), - schema: build_scripts_schema(), - region_numbers: vec![0], - primary_key_indices: vec![0, 1], - create_if_not_exists: true, - table_options: TableOptions::default(), - engine: MITO_ENGINE.to_string(), - }; - - let result = instance - .catalog_manager - .register_system_table(RegisterSystemTableRequest { - create_table_request: request, - open_hook: None, - }) - .await; - assert!(result.is_ok()); - - assert!( - instance - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .unwrap() - .is_some(), - "the registered system table cannot be found in catalog" - ); - - let mut actually_created_table_in_datanode = 0; - for datanode in instance.datanodes.values() { - if datanode - .catalog_manager() - .table(catalog_name, schema_name, table_name) - .await - .unwrap() - .is_some() - { - actually_created_table_in_datanode += 1; - } - } - assert_eq!( - actually_created_table_in_datanode, 1, - "system table should be actually created at one and only one datanode" - ) - } -} diff --git a/src/frontend/src/datanode.rs b/src/frontend/src/datanode.rs index 7be786f4a4..969026017f 100644 --- a/src/frontend/src/datanode.rs +++ b/src/frontend/src/datanode.rs @@ -62,8 +62,8 @@ impl DatanodeClients { .await } - #[cfg(test)] - pub(crate) async fn insert_client(&self, datanode: Peer, client: Client) { + #[cfg(feature = "testing")] + pub async fn insert_client(&self, datanode: Peer, client: Client) { self.clients.insert(datanode, client).await } } diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index e32cdcfbe2..d59d58906b 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -117,10 +117,7 @@ pub(crate) async fn create_external_expr( } /// Convert `CreateTable` statement to `CreateExpr` gRPC request. -pub(crate) fn create_to_expr( - create: &CreateTable, - query_ctx: QueryContextRef, -) -> Result { +pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Result { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name, query_ctx) .map_err(BoxedError::new) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bec85dcb2d..fbbed42af7 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod distributed; +pub mod distributed; mod grpc; mod influxdb; mod opentsdb; @@ -236,8 +236,7 @@ impl Instance { Ok(()) } - #[cfg(test)] - pub(crate) async fn new_distributed( + pub async fn new_distributed( catalog_manager: CatalogManagerRef, dist_instance: Arc, ) -> Self { @@ -423,8 +422,7 @@ impl Instance { .map(|_| ()) } - #[cfg(test)] - pub(crate) fn statement_executor(&self) -> Arc { + pub fn statement_executor(&self) -> Arc { self.statement_executor.clone() } } @@ -651,13 +649,9 @@ fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<( #[cfg(test)] mod tests { - use std::borrow::Cow; use std::collections::HashMap; - use std::sync::atomic::AtomicU32; use api::v1::column::Values; - use catalog::helper::{TableGlobalKey, TableGlobalValue}; - use common_recordbatch::RecordBatches; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use query::query_engine::options::QueryOptions; @@ -665,9 +659,6 @@ mod tests { use strfmt::Format; use super::*; - use crate::table::DistTable; - use crate::tests; - use crate::tests::MockDistributedInstance; #[test] fn test_validate_insert_request() { @@ -842,372 +833,4 @@ mod tests { let sql = "DESC TABLE {catalog}{schema}demo;"; replace_test(sql, plugins, &query_ctx); } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_exec_sql() { - let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await; - let instance = standalone.instance.as_ref(); - - let sql = r#" - CREATE TABLE demo( - host STRING, - ts TIMESTAMP, - cpu DOUBLE NULL, - memory DOUBLE NULL, - disk_util DOUBLE DEFAULT 9.9, - TIME INDEX (ts), - PRIMARY KEY(host) - ) engine=mito"#; - create_table(instance, sql).await; - - insert_and_query(instance).await; - - drop_table(instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_exec_sql() { - let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await; - let instance = distributed.frontend.as_ref(); - - let sql = r#" - CREATE TABLE demo( - host STRING, - ts TIMESTAMP, - cpu DOUBLE NULL, - memory DOUBLE NULL, - disk_util DOUBLE DEFAULT 9.9, - TIME INDEX (ts), - PRIMARY KEY(host) - ) - PARTITION BY RANGE COLUMNS (host) ( - PARTITION r0 VALUES LESS THAN ('550-A'), - PARTITION r1 VALUES LESS THAN ('550-W'), - PARTITION r2 VALUES LESS THAN ('MOSS'), - PARTITION r3 VALUES LESS THAN (MAXVALUE), - ) - engine=mito"#; - create_table(instance, sql).await; - - insert_and_query(instance).await; - - verify_data_distribution( - &distributed, - HashMap::from([ - ( - 0u32, - "\ -+---------------------+------+ -| ts | host | -+---------------------+------+ -| 2013-12-31T16:00:00 | 490 | -+---------------------+------+", - ), - ( - 1u32, - "\ -+---------------------+-------+ -| ts | host | -+---------------------+-------+ -| 2022-12-31T16:00:00 | 550-A | -+---------------------+-------+", - ), - ( - 2u32, - "\ -+---------------------+-------+ -| ts | host | -+---------------------+-------+ -| 2023-12-31T16:00:00 | 550-W | -+---------------------+-------+", - ), - ( - 3u32, - "\ -+---------------------+------+ -| ts | host | -+---------------------+------+ -| 2043-12-31T16:00:00 | MOSS | -+---------------------+------+", - ), - ]), - ) - .await; - - drop_table(instance).await; - - verify_table_is_dropped(&distributed).await; - } - - async fn query(instance: &Instance, sql: &str) -> Output { - SqlQueryHandler::do_query(instance, sql, QueryContext::arc()) - .await - .remove(0) - .unwrap() - } - - async fn create_table(instance: &Instance, sql: &str) { - let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; - assert_eq!(x, 0); - } - - async fn insert_and_query(instance: &Instance) { - let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES - ('490', 0.1, 1, 1388505600000), - ('550-A', 1, 100, 1672502400000), - ('550-W', 10000, 1000000, 1704038400000), - ('MOSS', 100000000, 10000000000, 2335190400000) - "#; - let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; - assert_eq!(x, 4); - - let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition - let output = query(instance, sql).await; - let Output::Stream(s) = output else { unreachable!() }; - let batches = common_recordbatch::util::collect_batches(s).await.unwrap(); - let pretty_print = batches.pretty_print().unwrap(); - let expected = "\ -+-------+---------------------+-------------+-----------+-----------+ -| host | ts | cpu | memory | disk_util | -+-------+---------------------+-------------+-----------+-----------+ -| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 | -| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 | -| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 | -| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 1.0e10 | 9.9 | -+-------+---------------------+-------------+-----------+-----------+"; - assert_eq!(pretty_print, expected); - } - - async fn verify_data_distribution( - instance: &MockDistributedInstance, - expected_distribution: HashMap, - ) { - let table = instance - .frontend - .catalog_manager() - .table("greptime", "public", "demo") - .await - .unwrap() - .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let TableGlobalValue { regions_id_map, .. } = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "demo".to_string(), - }) - .await - .unwrap() - .unwrap(); - let region_to_dn_map = regions_id_map - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); - assert_eq!(region_to_dn_map.len(), expected_distribution.len()); - - let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap(); - for (region, dn) in region_to_dn_map.iter() { - let dn = instance.datanodes.get(dn).unwrap(); - let engine = dn.query_engine(); - let plan = engine - .planner() - .plan(stmt.clone(), QueryContext::arc()) - .await - .unwrap(); - let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let actual = recordbatches.pretty_print().unwrap(); - - let expected = expected_distribution.get(region).unwrap(); - assert_eq!(&actual, expected); - } - } - - async fn drop_table(instance: &Instance) { - let sql = "DROP TABLE demo"; - let output = query(instance, sql).await; - let Output::AffectedRows(x) = output else { unreachable!() }; - assert_eq!(x, 1); - } - - async fn verify_table_is_dropped(instance: &MockDistributedInstance) { - for (_, dn) in instance.datanodes.iter() { - assert!(dn - .catalog_manager() - .table("greptime", "public", "demo") - .await - .unwrap() - .is_none()) - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_sql_interceptor_plugin() { - #[derive(Default)] - struct AssertionHook { - pub(crate) c: AtomicU32, - } - - impl SqlQueryInterceptor for AssertionHook { - type Error = Error; - - fn pre_parsing<'a>( - &self, - query: &'a str, - _query_ctx: QueryContextRef, - ) -> Result> { - self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - assert!(query.starts_with("CREATE TABLE demo")); - Ok(Cow::Borrowed(query)) - } - - fn post_parsing( - &self, - statements: Vec, - _query_ctx: QueryContextRef, - ) -> Result> { - self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - assert!(matches!(statements[0], Statement::CreateTable(_))); - Ok(statements) - } - - fn pre_execute( - &self, - _statement: &Statement, - _plan: Option<&query::plan::LogicalPlan>, - _query_ctx: QueryContextRef, - ) -> Result<()> { - self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - Ok(()) - } - - fn post_execute( - &self, - mut output: Output, - _query_ctx: QueryContextRef, - ) -> Result { - self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - match &mut output { - Output::AffectedRows(rows) => { - assert_eq!(*rows, 0); - // update output result - *rows = 10; - } - _ => unreachable!(), - } - Ok(output) - } - } - - let standalone = tests::create_standalone_instance("test_hook").await; - let mut instance = standalone.instance; - - let plugins = Plugins::new(); - let counter_hook = Arc::new(AssertionHook::default()); - plugins.insert::>(counter_hook.clone()); - Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); - - let sql = r#"CREATE TABLE demo( - host STRING, - ts TIMESTAMP, - cpu DOUBLE NULL, - memory DOUBLE NULL, - disk_util DOUBLE DEFAULT 9.9, - TIME INDEX (ts), - PRIMARY KEY(host) - ) engine=mito with(regions=1);"#; - let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc()) - .await - .remove(0) - .unwrap(); - - // assert that the hook is called 3 times - assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed)); - match output { - Output::AffectedRows(rows) => assert_eq!(rows, 10), - _ => unreachable!(), - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_disable_db_operation_plugin() { - #[derive(Default)] - struct DisableDBOpHook; - - impl SqlQueryInterceptor for DisableDBOpHook { - type Error = Error; - - fn post_parsing( - &self, - statements: Vec, - _query_ctx: QueryContextRef, - ) -> Result> { - for s in &statements { - match s { - Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => { - return Err(Error::NotSupported { - feat: "Database operations".to_owned(), - }) - } - _ => {} - } - } - - Ok(statements) - } - } - - let query_ctx = Arc::new(QueryContext::new()); - - let standalone = tests::create_standalone_instance("test_db_hook").await; - let mut instance = standalone.instance; - - let plugins = Plugins::new(); - let hook = Arc::new(DisableDBOpHook::default()); - plugins.insert::>(hook.clone()); - Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); - - let sql = r#"CREATE TABLE demo( - host STRING, - ts TIMESTAMP, - cpu DOUBLE NULL, - memory DOUBLE NULL, - disk_util DOUBLE DEFAULT 9.9, - TIME INDEX (ts), - PRIMARY KEY(host) - ) engine=mito with(regions=1);"#; - let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) - .await - .remove(0) - .unwrap(); - - match output { - Output::AffectedRows(rows) => assert_eq!(rows, 0), - _ => unreachable!(), - } - - let sql = r#"CREATE DATABASE tomcat"#; - if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) - .await - .remove(0) - { - assert!(matches!(e, error::Error::NotSupported { .. })); - } else { - unreachable!(); - } - - let sql = r#"SELECT 1; SHOW DATABASES"#; - if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) - .await - .remove(0) - { - assert!(matches!(e, error::Error::NotSupported { .. })); - } else { - unreachable!(); - } - } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a314d35747..5f925517c9 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -73,14 +73,14 @@ use crate::table::DistTable; const MAX_VALUE: &str = "MAXVALUE"; #[derive(Clone)] -pub(crate) struct DistInstance { +pub struct DistInstance { meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, } impl DistInstance { - pub(crate) fn new( + pub fn new( meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, @@ -92,7 +92,7 @@ impl DistInstance { } } - pub(crate) async fn create_table( + pub async fn create_table( &self, create_table: &mut CreateTableExpr, partitions: Option, @@ -579,8 +579,7 @@ impl DistInstance { Ok(Output::AffectedRows(affected_rows)) } - #[cfg(test)] - pub(crate) fn catalog_manager(&self) -> Arc { + pub fn catalog_manager(&self) -> Arc { self.catalog_manager.clone() } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 06960b8d43..5b16fccd8a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -81,845 +81,3 @@ impl GrpcQueryHandler for Instance { Ok(output) } } - -#[cfg(test)] -mod test { - use std::collections::HashMap; - - use api::v1::column::{SemanticType, Values}; - use api::v1::ddl_request::Expr as DdlExpr; - use api::v1::{ - alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, - FlushTableExpr, InsertRequest, QueryRequest, - }; - use catalog::helper::{TableGlobalKey, TableGlobalValue}; - use common_catalog::consts::MITO_ENGINE; - use common_query::Output; - use common_recordbatch::RecordBatches; - use query::parser::QueryLanguageParser; - use session::context::QueryContext; - use tests::{has_parquet_file, test_region_dir}; - - use super::*; - use crate::table::DistTable; - use crate::tests; - use crate::tests::MockDistributedInstance; - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_handle_ddl_request() { - let instance = - tests::create_distributed_instance("test_distributed_handle_ddl_request").await; - let frontend = &instance.frontend; - - test_handle_ddl_request(frontend.as_ref()).await; - - verify_table_is_dropped(&instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_handle_ddl_request() { - let standalone = - tests::create_standalone_instance("test_standalone_handle_ddl_request").await; - let instance = &standalone.instance; - - test_handle_ddl_request(instance.as_ref()).await; - } - - async fn query(instance: &Instance, request: Request) -> Output { - GrpcQueryHandler::do_query(instance, request, QueryContext::arc()) - .await - .unwrap() - } - - async fn test_handle_ddl_request(instance: &Instance) { - let request = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { - database_name: "database_created_through_grpc".to_string(), - create_if_not_exists: true, - })), - }); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(1))); - - let request = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(CreateTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "database_created_through_grpc".to_string(), - table_name: "table_created_through_grpc".to_string(), - column_defs: vec![ - ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as _, - is_nullable: true, - default_constraint: vec![], - }, - ColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, - is_nullable: false, - default_constraint: vec![], - }, - ], - time_index: "ts".to_string(), - engine: MITO_ENGINE.to_string(), - ..Default::default() - })), - }); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(0))); - - let request = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(AlterExpr { - catalog_name: "greptime".to_string(), - schema_name: "database_created_through_grpc".to_string(), - table_name: "table_created_through_grpc".to_string(), - kind: Some(alter_expr::Kind::AddColumns(AddColumns { - add_columns: vec![AddColumn { - column_def: Some(ColumnDef { - name: "b".to_string(), - datatype: ColumnDataType::Int32 as _, - is_nullable: true, - default_constraint: vec![], - }), - is_key: false, - }], - })), - })), - }); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(0))); - - let request = Request::Query(QueryRequest { - query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string())) - }); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(1))); - - let request = Request::Query(QueryRequest { - query: Some(Query::Sql( - "SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc" - .to_string(), - )), - }); - let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-04T07:14:26 | s | 1 | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - - let request = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(DropTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "database_created_through_grpc".to_string(), - table_name: "table_created_through_grpc".to_string(), - })), - }); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(1))); - } - - async fn verify_table_is_dropped(instance: &MockDistributedInstance) { - for (_, dn) in instance.datanodes.iter() { - assert!(dn - .catalog_manager() - .table( - "greptime", - "database_created_through_grpc", - "table_created_through_grpc" - ) - .await - .unwrap() - .is_none()); - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_insert_delete_and_query() { - common_telemetry::init_default_ut_logging(); - - let instance = - tests::create_distributed_instance("test_distributed_insert_delete_and_query").await; - let frontend = instance.frontend.as_ref(); - - let table_name = "my_dist_table"; - let sql = format!( - r" -CREATE TABLE {table_name} ( - a INT, - b STRING PRIMARY KEY, - ts TIMESTAMP, - TIME INDEX (ts) -) PARTITION BY RANGE COLUMNS(a) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), -)" - ); - create_table(frontend, sql).await; - - test_insert_delete_and_query_on_existing_table(frontend, table_name).await; - - verify_data_distribution( - &instance, - table_name, - HashMap::from([ - ( - 0u32, - "\ -+---------------------+---+-------------------+ -| ts | a | b | -+---------------------+---+-------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | -+---------------------+---+-------------------+", - ), - ( - 1u32, - "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | -+---------------------+----+-------------------+", - ), - ( - 2u32, - "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | -+---------------------+----+-------------------+", - ), - ( - 3u32, - "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | -+---------------------+----+-------------------+", - ), - ]), - ) - .await; - - test_insert_delete_and_query_on_auto_created_table(frontend).await; - - // Auto created table has only one region. - verify_data_distribution( - &instance, - "auto_created_table", - HashMap::from([( - 0u32, - "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+", - )]), - ) - .await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_insert_and_query() { - common_telemetry::init_default_ut_logging(); - - let standalone = - tests::create_standalone_instance("test_standalone_insert_and_query").await; - let instance = &standalone.instance; - - let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); - create_table(instance, sql).await; - - test_insert_delete_and_query_on_existing_table(instance, table_name).await; - - test_insert_delete_and_query_on_auto_created_table(instance).await - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_flush_table() { - common_telemetry::init_default_ut_logging(); - - let instance = tests::create_distributed_instance("test_distributed_flush_table").await; - let data_tmp_dirs = instance.data_tmp_dirs(); - let frontend = instance.frontend.as_ref(); - - let table_name = "my_dist_table"; - let sql = format!( - r" -CREATE TABLE {table_name} ( - a INT, - ts TIMESTAMP, - TIME INDEX (ts) -) PARTITION BY RANGE COLUMNS(a) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), -)" - ); - create_table(frontend, sql).await; - - test_insert_delete_and_query_on_existing_table(frontend, table_name).await; - - flush_table(frontend, "greptime", "public", table_name, None).await; - // Wait for previous task finished - flush_table(frontend, "greptime", "public", table_name, None).await; - - let table = instance - .frontend - .catalog_manager() - .table("greptime", "public", table_name) - .await - .unwrap() - .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let tgv = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - }) - .await - .unwrap() - .unwrap(); - let table_id = tgv.table_id(); - - let region_to_dn_map = tgv - .regions_id_map - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); - - for (region, dn) in region_to_dn_map.iter() { - // data_tmp_dirs -> dn: 1..4 - let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap(); - let region_dir = test_region_dir( - data_tmp_dir.path().to_str().unwrap(), - "greptime", - "public", - table_id, - *region, - ); - has_parquet_file(®ion_dir); - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_flush_table() { - common_telemetry::init_default_ut_logging(); - - let standalone = tests::create_standalone_instance("test_standalone_flush_table").await; - let instance = &standalone.instance; - let data_tmp_dir = standalone.data_tmp_dir(); - - let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); - - create_table(instance, sql).await; - - test_insert_delete_and_query_on_existing_table(instance, table_name).await; - - let table_id = 1024; - let region_id = 0; - let region_dir = test_region_dir( - data_tmp_dir.path().to_str().unwrap(), - "greptime", - "public", - table_id, - region_id, - ); - assert!(!has_parquet_file(®ion_dir)); - - flush_table(instance, "greptime", "public", "my_table", None).await; - // Wait for previous task finished - flush_table(instance, "greptime", "public", "my_table", None).await; - - assert!(has_parquet_file(®ion_dir)); - } - - async fn create_table(frontend: &Instance, sql: String) { - let request = Request::Query(QueryRequest { - query: Some(Query::Sql(sql)), - }); - let output = query(frontend, request).await; - assert!(matches!(output, Output::AffectedRows(0))); - } - - async fn flush_table( - frontend: &Instance, - catalog_name: &str, - schema_name: &str, - table_name: &str, - region_id: Option, - ) { - let request = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::FlushTable(FlushTableExpr { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - region_id, - })), - }); - - let output = query(frontend, request).await; - assert!(matches!(output, Output::AffectedRows(0))); - } - - async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) { - let ts_millisecond_values = vec![ - 1672557972000, - 1672557973000, - 1672557974000, - 1672557975000, - 1672557976000, - 1672557977000, - 1672557978000, - 1672557979000, - 1672557980000, - 1672557981000, - 1672557982000, - 1672557983000, - 1672557984000, - 1672557985000, - 1672557986000, - 1672557987000, - ]; - let insert = InsertRequest { - table_name: table_name.to_string(), - columns: vec![ - Column { - column_name: "a".to_string(), - values: Some(Values { - i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53], - ..Default::default() - }), - null_mask: vec![32, 0], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - }, - Column { - column_name: "b".to_string(), - values: Some(Values { - string_values: ts_millisecond_values - .iter() - .map(|x| format!("ts: {x}")) - .collect(), - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values, - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 16, - ..Default::default() - }; - let output = query(instance, Request::Insert(insert)).await; - assert!(matches!(output, Output::AffectedRows(16))); - - let request = Request::Query(QueryRequest { - query: Some(Query::Sql(format!( - "SELECT ts, a, b FROM {table_name} ORDER BY ts" - ))), - }); - let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | -| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | -| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | -+---------------------+----+-------------------+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - - let delete = DeleteRequest { - table_name: table_name.to_string(), - region_number: 0, - key_columns: vec![ - Column { - column_name: "a".to_string(), - semantic_type: SemanticType::Field as i32, - values: Some(Values { - i32_values: vec![2, 12, 22, 52], - ..Default::default() - }), - datatype: ColumnDataType::Int32 as i32, - ..Default::default() - }, - Column { - column_name: "b".to_string(), - semantic_type: SemanticType::Tag as i32, - values: Some(Values { - string_values: vec![ - "ts: 1672557973000".to_string(), - "ts: 1672557979000".to_string(), - "ts: 1672557982000".to_string(), - "ts: 1672557986000".to_string(), - ], - ..Default::default() - }), - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - Column { - column_name: "ts".to_string(), - semantic_type: SemanticType::Timestamp as i32, - values: Some(Values { - ts_millisecond_values: vec![ - 1672557973000, - 1672557979000, - 1672557982000, - 1672557986000, - ], - ..Default::default() - }), - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 4, - }; - let output = query(instance, Request::Delete(delete)).await; - assert!(matches!(output, Output::AffectedRows(4))); - - let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | -+---------------------+----+-------------------+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - async fn verify_data_distribution( - instance: &MockDistributedInstance, - table_name: &str, - expected_distribution: HashMap, - ) { - let table = instance - .frontend - .catalog_manager() - .table("greptime", "public", table_name) - .await - .unwrap() - .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let TableGlobalValue { regions_id_map, .. } = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - }) - .await - .unwrap() - .unwrap(); - let region_to_dn_map = regions_id_map - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); - assert_eq!(region_to_dn_map.len(), expected_distribution.len()); - - for (region, dn) in region_to_dn_map.iter() { - let stmt = QueryLanguageParser::parse_sql(&format!( - "SELECT ts, a, b FROM {table_name} ORDER BY ts" - )) - .unwrap(); - let dn = instance.datanodes.get(dn).unwrap(); - let engine = dn.query_engine(); - let plan = engine - .planner() - .plan(stmt, QueryContext::arc()) - .await - .unwrap(); - let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let actual = recordbatches.pretty_print().unwrap(); - - let expected = expected_distribution.get(region).unwrap(); - assert_eq!(&actual, expected); - } - } - - async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) { - let insert = InsertRequest { - table_name: "auto_created_table".to_string(), - columns: vec![ - Column { - column_name: "a".to_string(), - values: Some(Values { - i32_values: vec![4, 6], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 3, - ..Default::default() - }; - - // Test auto create not existed table upon insertion. - let request = Request::Insert(insert); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(3))); - - let insert = InsertRequest { - table_name: "auto_created_table".to_string(), - columns: vec![ - Column { - column_name: "b".to_string(), - values: Some(Values { - string_values: vec!["x".to_string(), "z".to_string()], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::String as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 3, - ..Default::default() - }; - - // Test auto add not existed column upon insertion. - let request = Request::Insert(insert); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(3))); - - let request = Request::Query(QueryRequest { - query: Some(Query::Sql( - "SELECT ts, a, b FROM auto_created_table".to_string(), - )), - }); - let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:15 | 4 | | -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:19 | | | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - - let delete = DeleteRequest { - table_name: "auto_created_table".to_string(), - region_number: 0, - key_columns: vec![Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557979000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }], - row_count: 2, - }; - - let output = query(instance, Request::Delete(delete)).await; - assert!(matches!(output, Output::AffectedRows(2))); - - let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_promql_query() { - common_telemetry::init_default_ut_logging(); - - let standalone = tests::create_standalone_instance("test_standalone_promql_query").await; - let instance = &standalone.instance; - - let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))"); - create_table(instance, sql).await; - - let insert = InsertRequest { - table_name: table_name.to_string(), - columns: vec![ - Column { - column_name: "h".to_string(), - values: Some(Values { - string_values: vec![ - "t".to_string(), - "t".to_string(), - "t".to_string(), - "t".to_string(), - "t".to_string(), - "t".to_string(), - "t".to_string(), - "t".to_string(), - ], - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - Column { - column_name: "a".to_string(), - values: Some(Values { - f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64], - ..Default::default() - }), - null_mask: vec![4], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float64 as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![ - 1672557972000, - 1672557973000, - 1672557974000, - 1672557975000, - 1672557976000, - 1672557977000, - 1672557978000, - 1672557979000, - ], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 8, - ..Default::default() - }; - - let request = Request::Insert(insert); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(8))); - - let request = Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery { - query: "my_table".to_owned(), - start: "1672557973".to_owned(), - end: "1672557978".to_owned(), - step: "1s".to_owned(), - })), - }); - let output = query(instance, request).await; - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---+------+---------------------+ -| h | a | ts | -+---+------+---------------------+ -| t | 11.0 | 2023-01-01T07:26:13 | -| t | | 2023-01-01T07:26:14 | -| t | 20.0 | 2023-01-01T07:26:15 | -| t | 22.0 | 2023-01-01T07:26:16 | -| t | 50.0 | 2023-01-01T07:26:17 | -| t | 55.0 | 2023-01-01T07:26:18 | -+---+------+---------------------+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } -} diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 8073ac740d..7256f587f2 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -36,65 +36,3 @@ impl InfluxdbLineProtocolHandler for Instance { Ok(()) } } - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use common_query::Output; - use common_recordbatch::RecordBatches; - use servers::query_handler::sql::SqlQueryHandler; - use session::context::QueryContext; - - use super::*; - use crate::tests; - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_put_influxdb_lines() { - let standalone = - tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; - let instance = &standalone.instance; - - test_put_influxdb_lines(instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_put_influxdb_lines() { - let instance = - tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; - let instance = &instance.frontend; - - test_put_influxdb_lines(instance).await; - } - - async fn test_put_influxdb_lines(instance: &Arc) { - let lines = r" -monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 -monitor1,host=host2 memory=1027 1663840496400340001"; - let request = InfluxdbRequest { - precision: None, - lines: lines.to_string(), - }; - instance.exec(&request, QueryContext::arc()).await.unwrap(); - - let mut output = instance - .do_query( - "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts", - QueryContext::arc(), - ) - .await; - let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - assert_eq!( - recordbatches.pretty_print().unwrap(), - "\ -+-------------------------+-------+------+--------+ -| ts | host | cpu | memory | -+-------------------------+-------+------+--------+ -| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 | -| 2022-09-22T09:54:56.400 | host2 | | 1027.0 | -+-------------------------+-------+------+--------+" - ); - } -} diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 3a5f6b9a5a..42c0ea7000 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -35,95 +35,3 @@ impl OpentsdbProtocolHandler for Instance { Ok(()) } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_query::Output; - use common_recordbatch::RecordBatches; - use itertools::Itertools; - use servers::query_handler::sql::SqlQueryHandler; - use session::context::QueryContext; - - use super::*; - use crate::tests; - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_exec() { - let standalone = tests::create_standalone_instance("test_standalone_exec").await; - let instance = &standalone.instance; - - test_exec(instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_exec() { - let distributed = tests::create_distributed_instance("test_distributed_exec").await; - let instance = &distributed.frontend; - - test_exec(instance).await; - } - - async fn test_exec(instance: &Arc) { - let ctx = QueryContext::arc(); - let data_point1 = DataPoint::new( - "my_metric_1".to_string(), - 1000, - 1.0, - vec![ - ("tagk1".to_string(), "tagv1".to_string()), - ("tagk2".to_string(), "tagv2".to_string()), - ], - ); - // should create new table "my_metric_1" directly - let result = instance.exec(&data_point1, ctx.clone()).await; - assert!(result.is_ok()); - - let data_point2 = DataPoint::new( - "my_metric_1".to_string(), - 2000, - 2.0, - vec![ - ("tagk2".to_string(), "tagv2".to_string()), - ("tagk3".to_string(), "tagv3".to_string()), - ], - ); - // should create new column "tagk3" directly - let result = instance.exec(&data_point2, ctx.clone()).await; - assert!(result.is_ok()); - - let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); - // should handle null tags properly - let result = instance.exec(&data_point3, ctx.clone()).await; - assert!(result.is_ok()); - - let output = instance - .do_query( - "select * from my_metric_1 order by greptime_timestamp", - Arc::new(QueryContext::new()), - ) - .await - .remove(0) - .unwrap(); - match output { - Output::Stream(stream) => { - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let pretty_print = recordbatches.pretty_print().unwrap(); - let expected = vec![ - "+---------------------+----------------+-------+-------+-------+", - "| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |", - "+---------------------+----------------+-------+-------+-------+", - "| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |", - "| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |", - "| 1970-01-01T00:00:03 | 3.0 | | | |", - "+---------------------+----------------+-------+-------+-------+", - ] - .into_iter() - .join("\n"); - assert_eq!(pretty_print, expected); - } - _ => unreachable!(), - }; - } -} diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index de7d38bd2c..6dd1d0b04c 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -153,168 +153,3 @@ impl PrometheusProtocolHandler for Instance { todo!(); } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::prometheus::remote::label_matcher::Type as MatcherType; - use api::prometheus::remote::{Label, LabelMatcher, Sample}; - use common_catalog::consts::DEFAULT_CATALOG_NAME; - use servers::query_handler::sql::SqlQueryHandler; - use session::context::QueryContext; - - use super::*; - use crate::tests; - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_prometheus_remote_rw() { - let standalone = - tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await; - let instance = &standalone.instance; - - test_prometheus_remote_rw(instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_prometheus_remote_rw() { - let distributed = - tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await; - let instance = &distributed.frontend; - - test_prometheus_remote_rw(instance).await; - } - - async fn test_prometheus_remote_rw(instance: &Arc) { - let write_request = WriteRequest { - timeseries: prometheus::mock_timeseries(), - ..Default::default() - }; - - let db = "prometheus"; - let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); - - assert!(SqlQueryHandler::do_query( - instance.as_ref(), - "CREATE DATABASE IF NOT EXISTS prometheus", - ctx.clone(), - ) - .await - .get(0) - .unwrap() - .is_ok()); - - instance.write(write_request, ctx.clone()).await.unwrap(); - - let read_request = ReadRequest { - queries: vec![ - Query { - start_timestamp_ms: 1000, - end_timestamp_ms: 2000, - matchers: vec![LabelMatcher { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric1".to_string(), - r#type: 0, - }], - ..Default::default() - }, - Query { - start_timestamp_ms: 1000, - end_timestamp_ms: 3000, - matchers: vec![ - LabelMatcher { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric3".to_string(), - r#type: 0, - }, - LabelMatcher { - name: "app".to_string(), - value: "biz".to_string(), - r#type: MatcherType::Eq as i32, - }, - ], - ..Default::default() - }, - ], - ..Default::default() - }; - - let resp = instance.read(read_request, ctx).await.unwrap(); - assert_eq!(resp.content_type, "application/x-protobuf"); - assert_eq!(resp.content_encoding, "snappy"); - let body = prometheus::snappy_decompress(&resp.body).unwrap(); - let read_response = ReadResponse::decode(&body[..]).unwrap(); - let query_results = read_response.results; - assert_eq!(2, query_results.len()); - - assert_eq!(1, query_results[0].timeseries.len()); - let timeseries = &query_results[0].timeseries[0]; - - assert_eq!( - vec![ - Label { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric1".to_string(), - }, - Label { - name: "job".to_string(), - value: "spark".to_string(), - }, - ], - timeseries.labels - ); - - assert_eq!( - timeseries.samples, - vec![ - Sample { - value: 1.0, - timestamp: 1000, - }, - Sample { - value: 2.0, - timestamp: 2000, - } - ] - ); - - assert_eq!(1, query_results[1].timeseries.len()); - let timeseries = &query_results[1].timeseries[0]; - - assert_eq!( - vec![ - Label { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric3".to_string(), - }, - Label { - name: "idc".to_string(), - value: "z002".to_string(), - }, - Label { - name: "app".to_string(), - value: "biz".to_string(), - }, - ], - timeseries.labels - ); - - assert_eq!( - timeseries.samples, - vec![ - Sample { - value: 5.0, - timestamp: 1000, - }, - Sample { - value: 6.0, - timestamp: 2000, - }, - Sample { - value: 7.0, - timestamp: 3000, - } - ] - ); - } -} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index cbcdd5bbe4..8ba07cf21c 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -18,7 +18,7 @@ pub mod catalog; pub mod datanode; pub mod error; -mod expr_factory; +pub mod expr_factory; pub mod frontend; pub mod grpc; pub mod influxdb; @@ -32,11 +32,4 @@ pub mod prometheus; mod script; mod server; pub mod statement; -mod table; -#[cfg(test)] -mod tests; - -#[cfg(test)] -// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate -#[allow(clippy::single_component_path_imports)] -use rstest_reuse; +pub mod table; diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index f0612756ab..d3cd1d82aa 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -59,7 +59,7 @@ impl StatementExecutor { } } - pub(crate) async fn execute_stmt( + pub async fn execute_stmt( &self, stmt: QueryStatement, query_ctx: QueryContextRef, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 89565a07bd..9e2013419e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -217,7 +217,7 @@ impl Table for DistTable { } impl DistTable { - pub(crate) fn new( + pub fn new( table_name: TableName, table_info: TableInfoRef, partition_manager: PartitionRuleManagerRef, @@ -233,7 +233,7 @@ impl DistTable { } } - pub(crate) async fn table_global_value( + pub async fn table_global_value( &self, key: &TableGlobalKey, ) -> Result> { @@ -496,25 +496,10 @@ mod test { use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; - use api::v1::column::SemanticType; - use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; use catalog::error::Result; use catalog::remote::{KvBackend, ValueIter}; - use common_query::physical_plan::DfPhysicalPlanAdapter; - use common_query::DfPhysicalPlan; - use common_recordbatch::adapter::RecordBatchStreamAdapter; - use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr}; - use datafusion::physical_plan::sorts::sort::SortExec; - use datafusion::prelude::SessionContext; - use datafusion::sql::sqlparser; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; - use datanode::instance::Instance; - use datatypes::arrow::compute::SortOptions; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; - use itertools::Itertools; use meta_client::client::MetaClient; use meta_client::rpc::router::RegionRoute; use meta_client::rpc::{Region, Table, TableRoute}; @@ -528,15 +513,10 @@ mod test { use partition::range::RangePartitionRule; use partition::route::TableRoutes; use partition::PartitionRuleRef; - use session::context::QueryContext; - use sql::parser::ParserContext; - use sql::statements::statement::Statement; use store_api::storage::RegionNumber; - use table::metadata::{TableInfoBuilder, TableMetaBuilder}; - use table::{meter_insert_request, TableRef}; + use table::meter_insert_request; use super::*; - use crate::expr_factory; struct DummyKvBackend; @@ -745,321 +725,6 @@ mod test { assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]); } - #[tokio::test(flavor = "multi_thread")] - async fn test_dist_table_scan() { - common_telemetry::init_default_ut_logging(); - let table = Arc::new(new_dist_table("test_dist_table_scan").await); - // should scan all regions - // select a, row_id from numbers - let projection = Some(vec![1, 2]); - let filters = vec![]; - let expected_output = vec![ - "+-----+--------+", - "| a | row_id |", - "+-----+--------+", - "| 0 | 1 |", - "| 1 | 2 |", - "| 2 | 3 |", - "| 3 | 4 |", - "| 4 | 5 |", - "| 10 | 1 |", - "| 11 | 2 |", - "| 12 | 3 |", - "| 13 | 4 |", - "| 14 | 5 |", - "| 30 | 1 |", - "| 31 | 2 |", - "| 32 | 3 |", - "| 33 | 4 |", - "| 34 | 5 |", - "| 100 | 1 |", - "| 101 | 2 |", - "| 102 | 3 |", - "| 103 | 4 |", - "| 104 | 5 |", - "+-----+--------+", - ]; - exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; - - // should scan only region 1 - // select a, row_id from numbers where a < 10 - let projection = Some(vec![1, 2]); - let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()]; - let expected_output = vec![ - "+---+--------+", - "| a | row_id |", - "+---+--------+", - "| 0 | 1 |", - "| 1 | 2 |", - "| 2 | 3 |", - "| 3 | 4 |", - "| 4 | 5 |", - "+---+--------+", - ]; - exec_table_scan(table.clone(), projection, filters, 1, expected_output).await; - - // should scan region 1 and 2 - // select a, row_id from numbers where a < 15 - let projection = Some(vec![1, 2]); - let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()]; - let expected_output = vec![ - "+----+--------+", - "| a | row_id |", - "+----+--------+", - "| 0 | 1 |", - "| 1 | 2 |", - "| 2 | 3 |", - "| 3 | 4 |", - "| 4 | 5 |", - "| 10 | 1 |", - "| 11 | 2 |", - "| 12 | 3 |", - "| 13 | 4 |", - "| 14 | 5 |", - "+----+--------+", - ]; - exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; - - // should scan region 2 and 3 - // select a, row_id from numbers where a < 40 and a >= 10 - let projection = Some(vec![1, 2]); - let filters = vec![and( - binary_expr(col("a"), Operator::Lt, lit(40)), - binary_expr(col("a"), Operator::GtEq, lit(10)), - ) - .into()]; - let expected_output = vec![ - "+----+--------+", - "| a | row_id |", - "+----+--------+", - "| 10 | 1 |", - "| 11 | 2 |", - "| 12 | 3 |", - "| 13 | 4 |", - "| 14 | 5 |", - "| 30 | 1 |", - "| 31 | 2 |", - "| 32 | 3 |", - "| 33 | 4 |", - "| 34 | 5 |", - "+----+--------+", - ]; - exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; - - // should scan all regions - // select a, row_id from numbers where a < 1000 and row_id == 1 - let projection = Some(vec![1, 2]); - let filters = vec![and( - binary_expr(col("a"), Operator::Lt, lit(1000)), - binary_expr(col("row_id"), Operator::Eq, lit(1)), - ) - .into()]; - let expected_output = vec![ - "+-----+--------+", - "| a | row_id |", - "+-----+--------+", - "| 0 | 1 |", - "| 10 | 1 |", - "| 30 | 1 |", - "| 100 | 1 |", - "+-----+--------+", - ]; - exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; - } - - async fn exec_table_scan( - table: TableRef, - projection: Option>, - filters: Vec, - expected_partitions: usize, - expected_output: Vec<&str>, - ) { - let expected_output = expected_output.into_iter().join("\n"); - let table_scan = table - .scan(projection.as_ref(), filters.as_slice(), None) - .await - .unwrap(); - assert_eq!( - table_scan.output_partitioning().partition_count(), - expected_partitions - ); - - let merge = - CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone()))); - - let sort = SortExec::new( - vec![PhysicalSortExpr { - expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(), - options: SortOptions::default(), - }], - Arc::new(merge), - ) - .with_fetch(None); - assert_eq!(sort.output_partitioning().partition_count(), 1); - - let session_ctx = SessionContext::new(); - let stream = sort.execute(0, session_ctx.task_ctx()).unwrap(); - let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap()); - - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - assert_eq!(recordbatches.pretty_print().unwrap(), expected_output); - } - - async fn new_dist_table(test_name: &str) -> DistTable { - let column_schemas = vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true), - ]; - let schema = Arc::new(Schema::new(column_schemas.clone())); - - let instance = crate::tests::create_distributed_instance(test_name).await; - let dist_instance = &instance.dist_instance; - let datanode_instances = instance.datanodes; - - let catalog_manager = dist_instance.catalog_manager(); - let partition_manager = catalog_manager.partition_manager(); - let datanode_clients = catalog_manager.datanode_clients(); - - let table_name = TableName::new("greptime", "public", "dist_numbers"); - - let sql = " - CREATE TABLE greptime.public.dist_numbers ( - ts BIGINT, - a INT, - row_id INT, - TIME INDEX (ts), - ) - PARTITION BY RANGE COLUMNS (a) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), - ) - ENGINE=mito"; - - let create_table = - match ParserContext::create_with_dialect(sql, &sqlparser::dialect::GenericDialect {}) - .unwrap() - .pop() - .unwrap() - { - Statement::CreateTable(c) => c, - _ => unreachable!(), - }; - - let mut expr = expr_factory::create_to_expr(&create_table, QueryContext::arc()).unwrap(); - let _result = dist_instance - .create_table(&mut expr, create_table.partitions) - .await - .unwrap(); - - let table_route = partition_manager - .find_table_route(&table_name) - .await - .unwrap(); - - let mut region_to_datanode_mapping = HashMap::new(); - for region_route in table_route.region_routes.iter() { - let region_id = region_route.region.id as u32; - let datanode_id = region_route.leader_peer.as_ref().unwrap().id; - region_to_datanode_mapping.insert(region_id, datanode_id); - } - - let mut global_start_ts = 1; - let regional_numbers = vec![ - (0, (0..5).collect::>()), - (1, (10..15).collect::>()), - (2, (30..35).collect::>()), - (3, (100..105).collect::>()), - ]; - for (region_number, numbers) in regional_numbers { - let datanode_id = *region_to_datanode_mapping.get(®ion_number).unwrap(); - let instance = datanode_instances.get(&datanode_id).unwrap().clone(); - - let start_ts = global_start_ts; - global_start_ts += numbers.len() as i64; - - insert_testing_data( - &table_name, - instance.clone(), - numbers, - start_ts, - region_number, - ) - .await; - } - - let meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![]) - .next_column_id(1) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .name(&table_name.table_name) - .meta(meta) - .build() - .unwrap(); - DistTable { - table_name, - table_info: Arc::new(table_info), - partition_manager, - datanode_clients, - backend: catalog_manager.backend(), - } - } - - async fn insert_testing_data( - table_name: &TableName, - dn_instance: Arc, - data: Vec, - start_ts: i64, - region_number: RegionNumber, - ) { - let row_count = data.len() as u32; - let columns = vec![ - Column { - column_name: "ts".to_string(), - values: Some(column::Values { - i64_values: (start_ts..start_ts + row_count as i64).collect::>(), - ..Default::default() - }), - datatype: ColumnDataType::Int64 as i32, - semantic_type: SemanticType::Timestamp as i32, - ..Default::default() - }, - Column { - column_name: "a".to_string(), - values: Some(column::Values { - i32_values: data, - ..Default::default() - }), - datatype: ColumnDataType::Int32 as i32, - ..Default::default() - }, - Column { - column_name: "row_id".to_string(), - values: Some(column::Values { - i32_values: (1..=row_count as i32).collect::>(), - ..Default::default() - }), - datatype: ColumnDataType::Int32 as i32, - ..Default::default() - }, - ]; - let request = GrpcInsertRequest { - table_name: table_name.table_name.clone(), - columns, - row_count, - region_number, - }; - dn_instance - .handle_insert(request, QueryContext::arc()) - .await - .unwrap(); - } - #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new( diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index a2abf33b61..6ba5644f4c 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -23,7 +23,7 @@ common-test-util = { path = "../src/common/test-util" } datanode = { path = "../src/datanode" } datatypes = { path = "../src/datatypes" } dotenv = "0.15" -frontend = { path = "../src/frontend" } +frontend = { path = "../src/frontend", features = ["testing"] } mito = { path = "../src/mito", features = ["test"] } object-store = { path = "../src/object-store" } once_cell = "1.16" @@ -40,4 +40,22 @@ tokio.workspace = true uuid.workspace = true [dev-dependencies] +common-base = { path = "../src/common/base" } +common-recordbatch = { path = "../src/common/recordbatch" } +datafusion.workspace = true +datafusion-expr.workspace = true +futures.workspace = true +itertools = "0.10" +meta-client = { path = "../src/meta-client" } +meta-srv = { path = "../src/meta-srv" } +partition = { path = "../src/partition" } paste.workspace = true +prost.workspace = true +query = { path = "../src/query" } +script = { path = "../src/script" } +session = { path = "../src/session" } +store-api = { path = "../src/store-api" } +tonic.workspace = true +tower = "0.4" +rstest = "0.17" +rstest_reuse = "0.5" diff --git a/tests-integration/src/catalog.rs b/tests-integration/src/catalog.rs new file mode 100644 index 0000000000..5e3187ac2c --- /dev/null +++ b/tests-integration/src/catalog.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use catalog::{CatalogManager, RegisterSystemTableRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; + use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME}; + use table::requests::{CreateTableRequest, TableOptions}; + + #[tokio::test(flavor = "multi_thread")] + async fn test_register_system_table() { + let instance = + crate::tests::create_distributed_instance("test_register_system_table").await; + + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = DEFAULT_SCHEMA_NAME; + let table_name = SCRIPTS_TABLE_NAME; + let request = CreateTableRequest { + id: 1, + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + desc: Some("Scripts table".to_string()), + schema: build_scripts_schema(), + region_numbers: vec![0], + primary_key_indices: vec![0, 1], + create_if_not_exists: true, + table_options: TableOptions::default(), + engine: MITO_ENGINE.to_string(), + }; + + let result = instance + .catalog_manager + .register_system_table(RegisterSystemTableRequest { + create_table_request: request, + open_hook: None, + }) + .await; + assert!(result.is_ok()); + + assert!( + instance + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .unwrap() + .is_some(), + "the registered system table cannot be found in catalog" + ); + + let mut actually_created_table_in_datanode = 0; + for datanode in instance.datanodes.values() { + if datanode + .catalog_manager() + .table(catalog_name, schema_name, table_name) + .await + .unwrap() + .is_some() + { + actually_created_table_in_datanode += 1; + } + } + assert_eq!( + actually_created_table_in_datanode, 1, + "system table should be actually created at one and only one datanode" + ) + } +} diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs new file mode 100644 index 0000000000..01252ec733 --- /dev/null +++ b/tests-integration/src/grpc.rs @@ -0,0 +1,858 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use api::v1::column::{SemanticType, Values}; + use api::v1::ddl_request::Expr as DdlExpr; + use api::v1::greptime_request::Request; + use api::v1::query_request::Query; + use api::v1::{ + alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, + CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, + FlushTableExpr, InsertRequest, QueryRequest, + }; + use catalog::helper::{TableGlobalKey, TableGlobalValue}; + use common_catalog::consts::MITO_ENGINE; + use common_query::Output; + use common_recordbatch::RecordBatches; + use frontend::instance::Instance; + use frontend::table::DistTable; + use query::parser::QueryLanguageParser; + use servers::query_handler::grpc::GrpcQueryHandler; + use session::context::QueryContext; + use tests::{has_parquet_file, test_region_dir}; + + use crate::tests; + use crate::tests::MockDistributedInstance; + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_handle_ddl_request() { + let instance = + tests::create_distributed_instance("test_distributed_handle_ddl_request").await; + let frontend = &instance.frontend; + + test_handle_ddl_request(frontend.as_ref()).await; + + verify_table_is_dropped(&instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_handle_ddl_request() { + let standalone = + tests::create_standalone_instance("test_standalone_handle_ddl_request").await; + let instance = &standalone.instance; + + test_handle_ddl_request(instance.as_ref()).await; + } + + async fn query(instance: &Instance, request: Request) -> Output { + GrpcQueryHandler::do_query(instance, request, QueryContext::arc()) + .await + .unwrap() + } + + async fn test_handle_ddl_request(instance: &Instance) { + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "database_created_through_grpc".to_string(), + create_if_not_exists: true, + })), + }); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(1))); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as _, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + ..Default::default() + })), + }); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(0))); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as _, + is_nullable: true, + default_constraint: vec![], + }), + is_key: false, + }], + })), + })), + }); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(0))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string())) + }); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(1))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc" + .to_string(), + )), + }); + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-04T07:14:26 | s | 1 | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(DropTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + })), + }); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(1))); + } + + async fn verify_table_is_dropped(instance: &MockDistributedInstance) { + for (_, dn) in instance.datanodes.iter() { + assert!(dn + .catalog_manager() + .table( + "greptime", + "database_created_through_grpc", + "table_created_through_grpc" + ) + .await + .unwrap() + .is_none()); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_insert_delete_and_query() { + common_telemetry::init_default_ut_logging(); + + let instance = + tests::create_distributed_instance("test_distributed_insert_delete_and_query").await; + let frontend = instance.frontend.as_ref(); + + let table_name = "my_dist_table"; + let sql = format!( + r" +CREATE TABLE {table_name} ( + a INT, + b STRING PRIMARY KEY, + ts TIMESTAMP, + TIME INDEX (ts) +) PARTITION BY RANGE COLUMNS(a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), +)" + ); + create_table(frontend, sql).await; + + test_insert_delete_and_query_on_existing_table(frontend, table_name).await; + + verify_data_distribution( + &instance, + table_name, + HashMap::from([ + ( + 0u32, + "\ ++---------------------+---+-------------------+ +| ts | a | b | ++---------------------+---+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | ++---------------------+---+-------------------+", + ), + ( + 1u32, + "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | ++---------------------+----+-------------------+", + ), + ( + 2u32, + "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | ++---------------------+----+-------------------+", + ), + ( + 3u32, + "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+", + ), + ]), + ) + .await; + + test_insert_delete_and_query_on_auto_created_table(frontend).await; + + // Auto created table has only one region. + verify_data_distribution( + &instance, + "auto_created_table", + HashMap::from([( + 0u32, + "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:20 | | z | ++---------------------+---+---+", + )]), + ) + .await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_insert_and_query() { + common_telemetry::init_default_ut_logging(); + + let standalone = + tests::create_standalone_instance("test_standalone_insert_and_query").await; + let instance = &standalone.instance; + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); + create_table(instance, sql).await; + + test_insert_delete_and_query_on_existing_table(instance, table_name).await; + + test_insert_delete_and_query_on_auto_created_table(instance).await + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_flush_table() { + common_telemetry::init_default_ut_logging(); + + let instance = tests::create_distributed_instance("test_distributed_flush_table").await; + let data_tmp_dirs = instance.data_tmp_dirs(); + let frontend = instance.frontend.as_ref(); + + let table_name = "my_dist_table"; + let sql = format!( + r" +CREATE TABLE {table_name} ( + a INT, + ts TIMESTAMP, + TIME INDEX (ts) +) PARTITION BY RANGE COLUMNS(a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), +)" + ); + create_table(frontend, sql).await; + + test_insert_delete_and_query_on_existing_table(frontend, table_name).await; + + flush_table(frontend, "greptime", "public", table_name, None).await; + // Wait for previous task finished + flush_table(frontend, "greptime", "public", table_name, None).await; + + let table = instance + .frontend + .catalog_manager() + .table("greptime", "public", table_name) + .await + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + + let tgv = table + .table_global_value(&TableGlobalKey { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + }) + .await + .unwrap() + .unwrap(); + let table_id = tgv.table_id(); + + let region_to_dn_map = tgv + .regions_id_map + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); + + for (region, dn) in region_to_dn_map.iter() { + // data_tmp_dirs -> dn: 1..4 + let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap(); + let region_dir = test_region_dir( + data_tmp_dir.path().to_str().unwrap(), + "greptime", + "public", + table_id, + *region, + ); + has_parquet_file(®ion_dir); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_flush_table() { + common_telemetry::init_default_ut_logging(); + + let standalone = tests::create_standalone_instance("test_standalone_flush_table").await; + let instance = &standalone.instance; + let data_tmp_dir = standalone.data_tmp_dir(); + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); + + create_table(instance, sql).await; + + test_insert_delete_and_query_on_existing_table(instance, table_name).await; + + let table_id = 1024; + let region_id = 0; + let region_dir = test_region_dir( + data_tmp_dir.path().to_str().unwrap(), + "greptime", + "public", + table_id, + region_id, + ); + assert!(!has_parquet_file(®ion_dir)); + + flush_table(instance, "greptime", "public", "my_table", None).await; + // Wait for previous task finished + flush_table(instance, "greptime", "public", "my_table", None).await; + + assert!(has_parquet_file(®ion_dir)); + } + + async fn create_table(frontend: &Instance, sql: String) { + let request = Request::Query(QueryRequest { + query: Some(Query::Sql(sql)), + }); + let output = query(frontend, request).await; + assert!(matches!(output, Output::AffectedRows(0))); + } + + async fn flush_table( + frontend: &Instance, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region_id: Option, + ) { + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::FlushTable(FlushTableExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + region_id, + })), + }); + + let output = query(frontend, request).await; + assert!(matches!(output, Output::AffectedRows(0))); + } + + async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) { + let ts_millisecond_values = vec![ + 1672557972000, + 1672557973000, + 1672557974000, + 1672557975000, + 1672557976000, + 1672557977000, + 1672557978000, + 1672557979000, + 1672557980000, + 1672557981000, + 1672557982000, + 1672557983000, + 1672557984000, + 1672557985000, + 1672557986000, + 1672557987000, + ]; + let insert = InsertRequest { + table_name: table_name.to_string(), + columns: vec![ + Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53], + ..Default::default() + }), + null_mask: vec![32, 0], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + }, + Column { + column_name: "b".to_string(), + values: Some(Values { + string_values: ts_millisecond_values + .iter() + .map(|x| format!("ts: {x}")) + .collect(), + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values, + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 16, + ..Default::default() + }; + let output = query(instance, Request::Insert(insert)).await; + assert!(matches!(output, Output::AffectedRows(16))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql(format!( + "SELECT ts, a, b FROM {table_name} ORDER BY ts" + ))), + }); + let output = query(instance, request.clone()).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | +| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let delete = DeleteRequest { + table_name: table_name.to_string(), + region_number: 0, + key_columns: vec![ + Column { + column_name: "a".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(Values { + i32_values: vec![2, 12, 22, 52], + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "b".to_string(), + semantic_type: SemanticType::Tag as i32, + values: Some(Values { + string_values: vec![ + "ts: 1672557973000".to_string(), + "ts: 1672557979000".to_string(), + "ts: 1672557982000".to_string(), + "ts: 1672557986000".to_string(), + ], + ..Default::default() + }), + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + semantic_type: SemanticType::Timestamp as i32, + values: Some(Values { + ts_millisecond_values: vec![ + 1672557973000, + 1672557979000, + 1672557982000, + 1672557986000, + ], + ..Default::default() + }), + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 4, + }; + let output = query(instance, Request::Delete(delete)).await; + assert!(matches!(output, Output::AffectedRows(4))); + + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + async fn verify_data_distribution( + instance: &MockDistributedInstance, + table_name: &str, + expected_distribution: HashMap, + ) { + let table = instance + .frontend + .catalog_manager() + .table("greptime", "public", table_name) + .await + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + + let TableGlobalValue { regions_id_map, .. } = table + .table_global_value(&TableGlobalKey { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + }) + .await + .unwrap() + .unwrap(); + let region_to_dn_map = regions_id_map + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); + assert_eq!(region_to_dn_map.len(), expected_distribution.len()); + + for (region, dn) in region_to_dn_map.iter() { + let stmt = QueryLanguageParser::parse_sql(&format!( + "SELECT ts, a, b FROM {table_name} ORDER BY ts" + )) + .unwrap(); + let dn = instance.datanodes.get(dn).unwrap(); + let engine = dn.query_engine(); + let plan = engine + .planner() + .plan(stmt, QueryContext::arc()) + .await + .unwrap(); + let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let actual = recordbatches.pretty_print().unwrap(); + + let expected = expected_distribution.get(region).unwrap(); + assert_eq!(&actual, expected); + } + } + + async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) { + let insert = InsertRequest { + table_name: "auto_created_table".to_string(), + columns: vec![ + Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![4, 6], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + // Test auto create not existed table upon insertion. + let request = Request::Insert(insert); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(3))); + + let insert = InsertRequest { + table_name: "auto_created_table".to_string(), + columns: vec![ + Column { + column_name: "b".to_string(), + values: Some(Values { + string_values: vec!["x".to_string(), "z".to_string()], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::String as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + // Test auto add not existed column upon insertion. + let request = Request::Insert(insert); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(3))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, a, b FROM auto_created_table".to_string(), + )), + }); + let output = query(instance, request.clone()).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:15 | 4 | | +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:19 | | | +| 2023-01-01T07:26:20 | | z | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let delete = DeleteRequest { + table_name: "auto_created_table".to_string(), + region_number: 0, + key_columns: vec![Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557975000, 1672557979000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }], + row_count: 2, + }; + + let output = query(instance, Request::Delete(delete)).await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:20 | | z | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_promql_query() { + common_telemetry::init_default_ut_logging(); + + let standalone = tests::create_standalone_instance("test_standalone_promql_query").await; + let instance = &standalone.instance; + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))"); + create_table(instance, sql).await; + + let insert = InsertRequest { + table_name: table_name.to_string(), + columns: vec![ + Column { + column_name: "h".to_string(), + values: Some(Values { + string_values: vec![ + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + "t".to_string(), + ], + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "a".to_string(), + values: Some(Values { + f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64], + ..Default::default() + }), + null_mask: vec![4], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![ + 1672557972000, + 1672557973000, + 1672557974000, + 1672557975000, + 1672557976000, + 1672557977000, + 1672557978000, + 1672557979000, + ], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 8, + ..Default::default() + }; + + let request = Request::Insert(insert); + let output = query(instance, request).await; + assert!(matches!(output, Output::AffectedRows(8))); + + let request = Request::Query(QueryRequest { + query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery { + query: "my_table".to_owned(), + start: "1672557973".to_owned(), + end: "1672557978".to_owned(), + step: "1s".to_owned(), + })), + }); + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---+------+---------------------+ +| h | a | ts | ++---+------+---------------------+ +| t | 11.0 | 2023-01-01T07:26:13 | +| t | | 2023-01-01T07:26:14 | +| t | 20.0 | 2023-01-01T07:26:15 | +| t | 22.0 | 2023-01-01T07:26:16 | +| t | 50.0 | 2023-01-01T07:26:17 | +| t | 55.0 | 2023-01-01T07:26:18 | ++---+------+---------------------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } +} diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs new file mode 100644 index 0000000000..e77880897f --- /dev/null +++ b/tests-integration/src/influxdb.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use common_query::Output; + use common_recordbatch::RecordBatches; + use frontend::instance::Instance; + use servers::influxdb::InfluxdbRequest; + use servers::query_handler::sql::SqlQueryHandler; + use servers::query_handler::InfluxdbLineProtocolHandler; + use session::context::QueryContext; + + use crate::tests; + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_put_influxdb_lines() { + let standalone = + tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; + let instance = &standalone.instance; + + test_put_influxdb_lines(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_put_influxdb_lines() { + let instance = + tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; + let instance = &instance.frontend; + + test_put_influxdb_lines(instance).await; + } + + async fn test_put_influxdb_lines(instance: &Arc) { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 +monitor1,host=host2 memory=1027 1663840496400340001"; + let request = InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + instance.exec(&request, QueryContext::arc()).await.unwrap(); + + let mut output = instance + .do_query( + "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts", + QueryContext::arc(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++-------------------------+-------+------+--------+ +| ts | host | cpu | memory | ++-------------------------+-------+------+--------+ +| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 | +| 2022-09-22T09:54:56.400 | host2 | | 1027.0 | ++-------------------------+-------+------+--------+" + ); + } +} diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs new file mode 100644 index 0000000000..d4876db6d7 --- /dev/null +++ b/tests-integration/src/instance.rs @@ -0,0 +1,405 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use std::borrow::Cow; + use std::collections::HashMap; + use std::sync::atomic::AtomicU32; + use std::sync::Arc; + + use catalog::helper::{TableGlobalKey, TableGlobalValue}; + use common_base::Plugins; + use common_query::Output; + use common_recordbatch::RecordBatches; + use frontend::error::{self, Error, Result}; + use frontend::instance::Instance; + use frontend::table::DistTable; + use query::parser::QueryLanguageParser; + use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; + use servers::query_handler::sql::SqlQueryHandler; + use session::context::{QueryContext, QueryContextRef}; + use sql::statements::statement::Statement; + + use crate::tests; + use crate::tests::MockDistributedInstance; + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_exec_sql() { + let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await; + let instance = standalone.instance.as_ref(); + + let sql = r#" + CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito"#; + create_table(instance, sql).await; + + insert_and_query(instance).await; + + drop_table(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_exec_sql() { + let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await; + let instance = distributed.frontend.as_ref(); + + let sql = r#" + CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) + ) + PARTITION BY RANGE COLUMNS (host) ( + PARTITION r0 VALUES LESS THAN ('550-A'), + PARTITION r1 VALUES LESS THAN ('550-W'), + PARTITION r2 VALUES LESS THAN ('MOSS'), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + engine=mito"#; + create_table(instance, sql).await; + + insert_and_query(instance).await; + + verify_data_distribution( + &distributed, + HashMap::from([ + ( + 0u32, + "\ ++---------------------+------+ +| ts | host | ++---------------------+------+ +| 2013-12-31T16:00:00 | 490 | ++---------------------+------+", + ), + ( + 1u32, + "\ ++---------------------+-------+ +| ts | host | ++---------------------+-------+ +| 2022-12-31T16:00:00 | 550-A | ++---------------------+-------+", + ), + ( + 2u32, + "\ ++---------------------+-------+ +| ts | host | ++---------------------+-------+ +| 2023-12-31T16:00:00 | 550-W | ++---------------------+-------+", + ), + ( + 3u32, + "\ ++---------------------+------+ +| ts | host | ++---------------------+------+ +| 2043-12-31T16:00:00 | MOSS | ++---------------------+------+", + ), + ]), + ) + .await; + + drop_table(instance).await; + + verify_table_is_dropped(&distributed).await; + } + + async fn query(instance: &Instance, sql: &str) -> Output { + SqlQueryHandler::do_query(instance, sql, QueryContext::arc()) + .await + .remove(0) + .unwrap() + } + + async fn create_table(instance: &Instance, sql: &str) { + let output = query(instance, sql).await; + let Output::AffectedRows(x) = output else { unreachable!() }; + assert_eq!(x, 0); + } + + async fn insert_and_query(instance: &Instance) { + let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES + ('490', 0.1, 1, 1388505600000), + ('550-A', 1, 100, 1672502400000), + ('550-W', 10000, 1000000, 1704038400000), + ('MOSS', 100000000, 10000000000, 2335190400000) + "#; + let output = query(instance, sql).await; + let Output::AffectedRows(x) = output else { unreachable!() }; + assert_eq!(x, 4); + + let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition + let output = query(instance, sql).await; + let Output::Stream(s) = output else { unreachable!() }; + let batches = common_recordbatch::util::collect_batches(s).await.unwrap(); + let pretty_print = batches.pretty_print().unwrap(); + let expected = "\ ++-------+---------------------+-------------+-----------+-----------+ +| host | ts | cpu | memory | disk_util | ++-------+---------------------+-------------+-----------+-----------+ +| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 | +| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 | +| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 | +| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 1.0e10 | 9.9 | ++-------+---------------------+-------------+-----------+-----------+"; + assert_eq!(pretty_print, expected); + } + + async fn verify_data_distribution( + instance: &MockDistributedInstance, + expected_distribution: HashMap, + ) { + let table = instance + .frontend + .catalog_manager() + .table("greptime", "public", "demo") + .await + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + + let TableGlobalValue { regions_id_map, .. } = table + .table_global_value(&TableGlobalKey { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "demo".to_string(), + }) + .await + .unwrap() + .unwrap(); + let region_to_dn_map = regions_id_map + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); + assert_eq!(region_to_dn_map.len(), expected_distribution.len()); + + let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap(); + for (region, dn) in region_to_dn_map.iter() { + let dn = instance.datanodes.get(dn).unwrap(); + let engine = dn.query_engine(); + let plan = engine + .planner() + .plan(stmt.clone(), QueryContext::arc()) + .await + .unwrap(); + let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let actual = recordbatches.pretty_print().unwrap(); + + let expected = expected_distribution.get(region).unwrap(); + assert_eq!(&actual, expected); + } + } + + async fn drop_table(instance: &Instance) { + let sql = "DROP TABLE demo"; + let output = query(instance, sql).await; + let Output::AffectedRows(x) = output else { unreachable!() }; + assert_eq!(x, 1); + } + + async fn verify_table_is_dropped(instance: &MockDistributedInstance) { + for (_, dn) in instance.datanodes.iter() { + assert!(dn + .catalog_manager() + .table("greptime", "public", "demo") + .await + .unwrap() + .is_none()) + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_sql_interceptor_plugin() { + #[derive(Default)] + struct AssertionHook { + pub(crate) c: AtomicU32, + } + + impl SqlQueryInterceptor for AssertionHook { + type Error = Error; + + fn pre_parsing<'a>( + &self, + query: &'a str, + _query_ctx: QueryContextRef, + ) -> Result> { + self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + assert!(query.starts_with("CREATE TABLE demo")); + Ok(Cow::Borrowed(query)) + } + + fn post_parsing( + &self, + statements: Vec, + _query_ctx: QueryContextRef, + ) -> Result> { + self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + assert!(matches!(statements[0], Statement::CreateTable(_))); + Ok(statements) + } + + fn pre_execute( + &self, + _statement: &Statement, + _plan: Option<&query::plan::LogicalPlan>, + _query_ctx: QueryContextRef, + ) -> Result<()> { + self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + fn post_execute( + &self, + mut output: Output, + _query_ctx: QueryContextRef, + ) -> Result { + self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + match &mut output { + Output::AffectedRows(rows) => { + assert_eq!(*rows, 0); + // update output result + *rows = 10; + } + _ => unreachable!(), + } + Ok(output) + } + } + + let standalone = tests::create_standalone_instance("test_hook").await; + let mut instance = standalone.instance; + + let plugins = Plugins::new(); + let counter_hook = Arc::new(AssertionHook::default()); + plugins.insert::>(counter_hook.clone()); + Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); + + let sql = r#"CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#; + let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc()) + .await + .remove(0) + .unwrap(); + + // assert that the hook is called 3 times + assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed)); + match output { + Output::AffectedRows(rows) => assert_eq!(rows, 10), + _ => unreachable!(), + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_disable_db_operation_plugin() { + #[derive(Default)] + struct DisableDBOpHook; + + impl SqlQueryInterceptor for DisableDBOpHook { + type Error = Error; + + fn post_parsing( + &self, + statements: Vec, + _query_ctx: QueryContextRef, + ) -> Result> { + for s in &statements { + match s { + Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => { + return Err(Error::NotSupported { + feat: "Database operations".to_owned(), + }) + } + _ => {} + } + } + + Ok(statements) + } + } + + let query_ctx = Arc::new(QueryContext::new()); + + let standalone = tests::create_standalone_instance("test_db_hook").await; + let mut instance = standalone.instance; + + let plugins = Plugins::new(); + let hook = Arc::new(DisableDBOpHook::default()); + plugins.insert::>(hook.clone()); + Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); + + let sql = r#"CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#; + let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) + .await + .remove(0) + .unwrap(); + + match output { + Output::AffectedRows(rows) => assert_eq!(rows, 0), + _ => unreachable!(), + } + + let sql = r#"CREATE DATABASE tomcat"#; + if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) + .await + .remove(0) + { + assert!(matches!(e, error::Error::NotSupported { .. })); + } else { + unreachable!(); + } + + let sql = r#"SELECT 1; SHOW DATABASES"#; + if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone()) + .await + .remove(0) + { + assert!(matches!(e, error::Error::NotSupported { .. })); + } else { + unreachable!(); + } + } +} diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index 423bc7f300..3a3642b49a 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -12,4 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod catalog; +mod grpc; +mod influxdb; +mod instance; +mod opentsdb; +mod prometheus; +mod table; pub mod test_util; + +#[cfg(test)] +mod tests; + +#[cfg(test)] +// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate +#[allow(clippy::single_component_path_imports)] +use rstest_reuse; diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs new file mode 100644 index 0000000000..b61fd88ce1 --- /dev/null +++ b/tests-integration/src/opentsdb.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::Output; + use common_recordbatch::RecordBatches; + use frontend::instance::Instance; + use itertools::Itertools; + use servers::opentsdb::codec::DataPoint; + use servers::query_handler::sql::SqlQueryHandler; + use servers::query_handler::OpentsdbProtocolHandler; + use session::context::QueryContext; + + use crate::tests; + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_exec() { + let standalone = tests::create_standalone_instance("test_standalone_exec").await; + let instance = &standalone.instance; + + test_exec(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_exec() { + let distributed = tests::create_distributed_instance("test_distributed_exec").await; + let instance = &distributed.frontend; + + test_exec(instance).await; + } + + async fn test_exec(instance: &Arc) { + let ctx = QueryContext::arc(); + let data_point1 = DataPoint::new( + "my_metric_1".to_string(), + 1000, + 1.0, + vec![ + ("tagk1".to_string(), "tagv1".to_string()), + ("tagk2".to_string(), "tagv2".to_string()), + ], + ); + // should create new table "my_metric_1" directly + let result = instance.exec(&data_point1, ctx.clone()).await; + assert!(result.is_ok()); + + let data_point2 = DataPoint::new( + "my_metric_1".to_string(), + 2000, + 2.0, + vec![ + ("tagk2".to_string(), "tagv2".to_string()), + ("tagk3".to_string(), "tagv3".to_string()), + ], + ); + // should create new column "tagk3" directly + let result = instance.exec(&data_point2, ctx.clone()).await; + assert!(result.is_ok()); + + let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); + // should handle null tags properly + let result = instance.exec(&data_point3, ctx.clone()).await; + assert!(result.is_ok()); + + let output = instance + .do_query( + "select * from my_metric_1 order by greptime_timestamp", + Arc::new(QueryContext::new()), + ) + .await + .remove(0) + .unwrap(); + match output { + Output::Stream(stream) => { + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let pretty_print = recordbatches.pretty_print().unwrap(); + let expected = vec![ + "+---------------------+----------------+-------+-------+-------+", + "| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |", + "+---------------------+----------------+-------+-------+-------+", + "| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |", + "| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |", + "| 1970-01-01T00:00:03 | 3.0 | | | |", + "+---------------------+----------------+-------+-------+-------+", + ] + .into_iter() + .join("\n"); + assert_eq!(pretty_print, expected); + } + _ => unreachable!(), + }; + } +} diff --git a/tests-integration/src/prometheus.rs b/tests-integration/src/prometheus.rs new file mode 100644 index 0000000000..6bd20c70fc --- /dev/null +++ b/tests-integration/src/prometheus.rs @@ -0,0 +1,183 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::prometheus::remote::label_matcher::Type as MatcherType; + use api::prometheus::remote::{ + Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest, + }; + use common_catalog::consts::DEFAULT_CATALOG_NAME; + use frontend::instance::Instance; + use prost::Message; + use servers::prometheus; + use servers::query_handler::sql::SqlQueryHandler; + use servers::query_handler::PrometheusProtocolHandler; + use session::context::QueryContext; + + use crate::tests; + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_prometheus_remote_rw() { + let standalone = + tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await; + let instance = &standalone.instance; + + test_prometheus_remote_rw(instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_prometheus_remote_rw() { + let distributed = + tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await; + let instance = &distributed.frontend; + + test_prometheus_remote_rw(instance).await; + } + + async fn test_prometheus_remote_rw(instance: &Arc) { + let write_request = WriteRequest { + timeseries: prometheus::mock_timeseries(), + ..Default::default() + }; + + let db = "prometheus"; + let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); + + assert!(SqlQueryHandler::do_query( + instance.as_ref(), + "CREATE DATABASE IF NOT EXISTS prometheus", + ctx.clone(), + ) + .await + .get(0) + .unwrap() + .is_ok()); + + instance.write(write_request, ctx.clone()).await.unwrap(); + + let read_request = ReadRequest { + queries: vec![ + Query { + start_timestamp_ms: 1000, + end_timestamp_ms: 2000, + matchers: vec![LabelMatcher { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric1".to_string(), + r#type: 0, + }], + ..Default::default() + }, + Query { + start_timestamp_ms: 1000, + end_timestamp_ms: 3000, + matchers: vec![ + LabelMatcher { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric3".to_string(), + r#type: 0, + }, + LabelMatcher { + name: "app".to_string(), + value: "biz".to_string(), + r#type: MatcherType::Eq as i32, + }, + ], + ..Default::default() + }, + ], + ..Default::default() + }; + + let resp = instance.read(read_request, ctx).await.unwrap(); + assert_eq!(resp.content_type, "application/x-protobuf"); + assert_eq!(resp.content_encoding, "snappy"); + let body = prometheus::snappy_decompress(&resp.body).unwrap(); + let read_response = ReadResponse::decode(&body[..]).unwrap(); + let query_results = read_response.results; + assert_eq!(2, query_results.len()); + + assert_eq!(1, query_results[0].timeseries.len()); + let timeseries = &query_results[0].timeseries[0]; + + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric1".to_string(), + }, + Label { + name: "job".to_string(), + value: "spark".to_string(), + }, + ], + timeseries.labels + ); + + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 1.0, + timestamp: 1000, + }, + Sample { + value: 2.0, + timestamp: 2000, + } + ] + ); + + assert_eq!(1, query_results[1].timeseries.len()); + let timeseries = &query_results[1].timeseries[0]; + + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric3".to_string(), + }, + Label { + name: "idc".to_string(), + value: "z002".to_string(), + }, + Label { + name: "app".to_string(), + value: "biz".to_string(), + }, + ], + timeseries.labels + ); + + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 5.0, + timestamp: 1000, + }, + Sample { + value: 6.0, + timestamp: 2000, + }, + Sample { + value: 7.0, + timestamp: 3000, + } + ] + ); + } +} diff --git a/tests-integration/src/table.rs b/tests-integration/src/table.rs new file mode 100644 index 0000000000..42e25ec9a4 --- /dev/null +++ b/tests-integration/src/table.rs @@ -0,0 +1,363 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::column::SemanticType; + use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; + use common_query::logical_plan::Expr; + use common_query::physical_plan::DfPhysicalPlanAdapter; + use common_query::DfPhysicalPlan; + use common_recordbatch::adapter::RecordBatchStreamAdapter; + use common_recordbatch::RecordBatches; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr}; + use datafusion::physical_plan::sorts::sort::SortExec; + use datafusion::prelude::SessionContext; + use datafusion::sql::sqlparser; + use datafusion_expr::expr_fn::{and, binary_expr, col}; + use datafusion_expr::{lit, Operator}; + use datanode::instance::Instance; + use datatypes::arrow::compute::SortOptions; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use frontend::expr_factory; + use frontend::table::DistTable; + use itertools::Itertools; + use meta_client::rpc::TableName; + use session::context::QueryContext; + use sql::parser::ParserContext; + use sql::statements::statement::Statement; + use store_api::storage::RegionNumber; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use table::TableRef; + + #[tokio::test(flavor = "multi_thread")] + async fn test_dist_table_scan() { + common_telemetry::init_default_ut_logging(); + let table = Arc::new(new_dist_table("test_dist_table_scan").await); + // should scan all regions + // select a, row_id from numbers + let projection = Some(vec![1, 2]); + let filters = vec![]; + let expected_output = vec![ + "+-----+--------+", + "| a | row_id |", + "+-----+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "| 30 | 1 |", + "| 31 | 2 |", + "| 32 | 3 |", + "| 33 | 4 |", + "| 34 | 5 |", + "| 100 | 1 |", + "| 101 | 2 |", + "| 102 | 3 |", + "| 103 | 4 |", + "| 104 | 5 |", + "+-----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; + + // should scan only region 1 + // select a, row_id from numbers where a < 10 + let projection = Some(vec![1, 2]); + let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()]; + let expected_output = vec![ + "+---+--------+", + "| a | row_id |", + "+---+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "+---+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 1, expected_output).await; + + // should scan region 1 and 2 + // select a, row_id from numbers where a < 15 + let projection = Some(vec![1, 2]); + let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()]; + let expected_output = vec![ + "+----+--------+", + "| a | row_id |", + "+----+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "+----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; + + // should scan region 2 and 3 + // select a, row_id from numbers where a < 40 and a >= 10 + let projection = Some(vec![1, 2]); + let filters = vec![and( + binary_expr(col("a"), Operator::Lt, lit(40)), + binary_expr(col("a"), Operator::GtEq, lit(10)), + ) + .into()]; + let expected_output = vec![ + "+----+--------+", + "| a | row_id |", + "+----+--------+", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "| 30 | 1 |", + "| 31 | 2 |", + "| 32 | 3 |", + "| 33 | 4 |", + "| 34 | 5 |", + "+----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; + + // should scan all regions + // select a, row_id from numbers where a < 1000 and row_id == 1 + let projection = Some(vec![1, 2]); + let filters = vec![and( + binary_expr(col("a"), Operator::Lt, lit(1000)), + binary_expr(col("row_id"), Operator::Eq, lit(1)), + ) + .into()]; + let expected_output = vec![ + "+-----+--------+", + "| a | row_id |", + "+-----+--------+", + "| 0 | 1 |", + "| 10 | 1 |", + "| 30 | 1 |", + "| 100 | 1 |", + "+-----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; + } + + async fn exec_table_scan( + table: TableRef, + projection: Option>, + filters: Vec, + expected_partitions: usize, + expected_output: Vec<&str>, + ) { + let expected_output = expected_output.into_iter().join("\n"); + let table_scan = table + .scan(projection.as_ref(), filters.as_slice(), None) + .await + .unwrap(); + assert_eq!( + table_scan.output_partitioning().partition_count(), + expected_partitions + ); + + let merge = + CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone()))); + + let sort = SortExec::new( + vec![PhysicalSortExpr { + expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(), + options: SortOptions::default(), + }], + Arc::new(merge), + ) + .with_fetch(None); + assert_eq!(sort.output_partitioning().partition_count(), 1); + + let session_ctx = SessionContext::new(); + let stream = sort.execute(0, session_ctx.task_ctx()).unwrap(); + let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap()); + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(recordbatches.pretty_print().unwrap(), expected_output); + } + + async fn new_dist_table(test_name: &str) -> DistTable { + let column_schemas = vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true), + ]; + let schema = Arc::new(Schema::new(column_schemas.clone())); + + let instance = crate::tests::create_distributed_instance(test_name).await; + let dist_instance = &instance.dist_instance; + let datanode_instances = instance.datanodes; + + let catalog_manager = dist_instance.catalog_manager(); + let partition_manager = catalog_manager.partition_manager(); + let datanode_clients = catalog_manager.datanode_clients(); + + let table_name = TableName::new("greptime", "public", "dist_numbers"); + + let sql = " + CREATE TABLE greptime.public.dist_numbers ( + ts BIGINT, + a INT, + row_id INT, + TIME INDEX (ts), + ) + PARTITION BY RANGE COLUMNS (a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + ENGINE=mito"; + + let create_table = + match ParserContext::create_with_dialect(sql, &sqlparser::dialect::GenericDialect {}) + .unwrap() + .pop() + .unwrap() + { + Statement::CreateTable(c) => c, + _ => unreachable!(), + }; + + let mut expr = expr_factory::create_to_expr(&create_table, QueryContext::arc()).unwrap(); + let _result = dist_instance + .create_table(&mut expr, create_table.partitions) + .await + .unwrap(); + + let table_route = partition_manager + .find_table_route(&table_name) + .await + .unwrap(); + + let mut region_to_datanode_mapping = HashMap::new(); + for region_route in table_route.region_routes.iter() { + let region_id = region_route.region.id as u32; + let datanode_id = region_route.leader_peer.as_ref().unwrap().id; + region_to_datanode_mapping.insert(region_id, datanode_id); + } + + let mut global_start_ts = 1; + let regional_numbers = vec![ + (0, (0..5).collect::>()), + (1, (10..15).collect::>()), + (2, (30..35).collect::>()), + (3, (100..105).collect::>()), + ]; + for (region_number, numbers) in regional_numbers { + let datanode_id = *region_to_datanode_mapping.get(®ion_number).unwrap(); + let instance = datanode_instances.get(&datanode_id).unwrap().clone(); + + let start_ts = global_start_ts; + global_start_ts += numbers.len() as i64; + + insert_testing_data( + &table_name, + instance.clone(), + numbers, + start_ts, + region_number, + ) + .await; + } + + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .next_column_id(1) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(&table_name.table_name) + .meta(meta) + .build() + .unwrap(); + DistTable::new( + table_name, + Arc::new(table_info), + partition_manager, + datanode_clients, + catalog_manager.backend(), + ) + } + + async fn insert_testing_data( + table_name: &TableName, + dn_instance: Arc, + data: Vec, + start_ts: i64, + region_number: RegionNumber, + ) { + let row_count = data.len() as u32; + let columns = vec![ + Column { + column_name: "ts".to_string(), + values: Some(column::Values { + i64_values: (start_ts..start_ts + row_count as i64).collect::>(), + ..Default::default() + }), + datatype: ColumnDataType::Int64 as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + Column { + column_name: "a".to_string(), + values: Some(column::Values { + i32_values: data, + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "row_id".to_string(), + values: Some(column::Values { + i32_values: (1..=row_count as i32).collect::>(), + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + ]; + let request = GrpcInsertRequest { + table_name: table_name.table_name.clone(), + columns, + row_count, + region_number, + }; + dn_instance + .handle_insert(request, QueryContext::arc()) + .await + .unwrap(); + } +} diff --git a/src/frontend/src/tests.rs b/tests-integration/src/tests.rs similarity index 98% rename from src/frontend/src/tests.rs rename to tests-integration/src/tests.rs index 4ae3ed05d6..255997cd28 100644 --- a/src/frontend/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -30,6 +30,10 @@ use datanode::datanode::{ DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, }; use datanode::instance::Instance as DatanodeInstance; +use frontend::catalog::FrontendCatalogManager; +use frontend::datanode::DatanodeClients; +use frontend::instance::distributed::DistInstance; +use frontend::instance::Instance; use meta_client::client::MetaClientBuilder; use meta_client::rpc::Peer; use meta_srv::metasrv::MetaSrvOptions; @@ -45,11 +49,6 @@ use table::engine::{region_name, table_dir}; use tonic::transport::Server; use tower::service_fn; -use crate::catalog::FrontendCatalogManager; -use crate::datanode::DatanodeClients; -use crate::instance::distributed::DistInstance; -use crate::instance::Instance; - /// Guard against the `TempDir`s that used in unit tests. /// (The `TempDir` will be deleted once it goes out of scope.) pub struct TestGuard { diff --git a/src/frontend/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs similarity index 99% rename from src/frontend/src/tests/instance_test.rs rename to tests-integration/src/tests/instance_test.rs index 0e97d48e10..b208a3d4fb 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -20,13 +20,13 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::logging; use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef}; +use frontend::error::{Error, Result}; +use frontend::instance::Instance; use rstest::rstest; use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; -use crate::error::{Error, Result}; -use crate::instance::Instance; use crate::tests::test_util::{ both_instances_cases, check_output_stream, check_unordered_output_stream, distributed, get_data_dir, standalone, standalone_instance_case, MockInstance, @@ -516,7 +516,7 @@ async fn test_execute_external_create_without_ts_type(instance: Arc) { let instance = instance.frontend(); let format = "parquet"; - let location = get_data_dir("../../tests/data/parquet/various_type.parquet") + let location = get_data_dir("../tests/data/parquet/various_type.parquet") .canonicalize() .unwrap() .display() @@ -586,7 +586,7 @@ async fn test_execute_query_external_table_parquet(instance: Arc) { let instance = instance.frontend(); let format = "csv"; - let location = get_data_dir("../../tests/data/csv/various_type.csv") + let location = get_data_dir("../tests/data/csv/various_type.csv") .canonicalize() .unwrap() .display() @@ -637,7 +637,7 @@ async fn test_execute_query_external_table_csv(instance: Arc) async fn test_execute_query_external_table_json(instance: Arc) { let instance = instance.frontend(); let format = "json"; - let location = get_data_dir("../../tests/data/json/various_type.json") + let location = get_data_dir("../tests/data/json/various_type.json") .canonicalize() .unwrap() .display() @@ -694,7 +694,7 @@ async fn test_execute_query_external_table_json(instance: Arc) async fn test_execute_query_external_table_json_with_schame(instance: Arc) { let instance = instance.frontend(); let format = "json"; - let location = get_data_dir("../../tests/data/json/various_type.json") + let location = get_data_dir("../tests/data/json/various_type.json") .canonicalize() .unwrap() .display() diff --git a/src/frontend/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs similarity index 99% rename from src/frontend/src/tests/promql_test.rs rename to tests-integration/src/tests/promql_test.rs index e1ea15949f..9e8fbfb1ad 100644 --- a/src/frontend/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use frontend::instance::Instance; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use rstest::rstest; use rstest_reuse::apply; @@ -22,7 +23,6 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use super::test_util::{check_unordered_output_stream, standalone, standalone_instance_case}; -use crate::instance::Instance; use crate::tests::test_util::MockInstance; #[allow(clippy::too_many_arguments)] diff --git a/src/frontend/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs similarity index 99% rename from src/frontend/src/tests/test_util.rs rename to tests-integration/src/tests/test_util.rs index 668370c9ed..d5a4d56aaa 100644 --- a/src/frontend/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use common_query::Output; use common_recordbatch::util; +use frontend::instance::Instance; use rstest_reuse::{self, template}; -use crate::instance::Instance; use crate::tests::{ create_distributed_instance, create_standalone_instance, MockDistributedInstance, MockStandaloneInstance,