test: move instances tests to "tests-integration" (#1573)

* test: move standalone and distributed instances tests from "frontend" crate to "tests-integration"

* fix: resolve PR comments
This commit is contained in:
LFC
2023-05-15 12:00:43 +08:00
committed by GitHub
parent cfcfc72681
commit 4ae0b5e185
27 changed files with 2158 additions and 1990 deletions

20
Cargo.lock generated
View File

@@ -3081,8 +3081,6 @@ dependencies = [
"prost",
"query",
"regex",
"rstest",
"rstest_reuse",
"script",
"serde",
"serde_json",
@@ -8764,31 +8762,49 @@ dependencies = [
"axum-test-helper",
"catalog",
"client",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"datafusion",
"datafusion-expr",
"datanode",
"datatypes",
"dotenv",
"frontend",
"futures",
"itertools",
"meta-client",
"meta-srv",
"mito",
"object-store",
"once_cell",
"partition",
"paste",
"prost",
"query",
"rand",
"rstest",
"rstest_reuse",
"script",
"secrecy",
"serde",
"serde_json",
"servers",
"session",
"snafu",
"sql",
"store-api",
"table",
"tempfile",
"tokio",
"tonic 0.9.2",
"tower",
"uuid",
]

View File

@@ -7,6 +7,7 @@ license.workspace = true
[features]
default = ["python"]
python = ["dep:script"]
testing = []
[dependencies]
api = { path = "../api" }
@@ -66,8 +67,6 @@ common-test-util = { path = "../common/test-util" }
datanode = { path = "../datanode" }
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }
rstest = "0.17"
rstest_reuse = "0.5"
strfmt = "0.2"
toml = "0.5"
tower = "0.4"

View File

@@ -75,19 +75,19 @@ impl FrontendCatalogManager {
}
}
pub(crate) fn set_dist_instance(&mut self, dist_instance: Arc<DistInstance>) {
pub fn set_dist_instance(&mut self, dist_instance: Arc<DistInstance>) {
self.dist_instance = Some(dist_instance)
}
pub(crate) fn backend(&self) -> KvBackendRef {
pub fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
pub(crate) fn partition_manager(&self) -> PartitionRuleManagerRef {
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
self.partition_manager.clone()
}
pub(crate) fn datanode_clients(&self) -> Arc<DatanodeClients> {
pub fn datanode_clients(&self) -> Arc<DatanodeClients> {
self.datanode_clients.clone()
}
}
@@ -406,71 +406,3 @@ impl SchemaProvider for FrontendSchemaProvider {
Ok(self.table_names().await?.contains(&name.to_string()))
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME};
use table::requests::{CreateTableRequest, TableOptions};
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_register_system_table() {
let instance =
crate::tests::create_distributed_instance("test_register_system_table").await;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = DEFAULT_SCHEMA_NAME;
let table_name = SCRIPTS_TABLE_NAME;
let request = CreateTableRequest {
id: 1,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: Some("Scripts table".to_string()),
schema: build_scripts_schema(),
region_numbers: vec![0],
primary_key_indices: vec![0, 1],
create_if_not_exists: true,
table_options: TableOptions::default(),
engine: MITO_ENGINE.to_string(),
};
let result = instance
.catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: None,
})
.await;
assert!(result.is_ok());
assert!(
instance
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some(),
"the registered system table cannot be found in catalog"
);
let mut actually_created_table_in_datanode = 0;
for datanode in instance.datanodes.values() {
if datanode
.catalog_manager()
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some()
{
actually_created_table_in_datanode += 1;
}
}
assert_eq!(
actually_created_table_in_datanode, 1,
"system table should be actually created at one and only one datanode"
)
}
}

View File

@@ -62,8 +62,8 @@ impl DatanodeClients {
.await
}
#[cfg(test)]
pub(crate) async fn insert_client(&self, datanode: Peer, client: Client) {
#[cfg(feature = "testing")]
pub async fn insert_client(&self, datanode: Peer, client: Client) {
self.clients.insert(datanode, client).await
}
}

View File

@@ -117,10 +117,7 @@ pub(crate) async fn create_external_expr(
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
pub(crate) fn create_to_expr(
create: &CreateTable,
query_ctx: QueryContextRef,
) -> Result<CreateTableExpr> {
pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
.map_err(BoxedError::new)

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod distributed;
pub mod distributed;
mod grpc;
mod influxdb;
mod opentsdb;
@@ -236,8 +236,7 @@ impl Instance {
Ok(())
}
#[cfg(test)]
pub(crate) async fn new_distributed(
pub async fn new_distributed(
catalog_manager: CatalogManagerRef,
dist_instance: Arc<DistInstance>,
) -> Self {
@@ -423,8 +422,7 @@ impl Instance {
.map(|_| ())
}
#[cfg(test)]
pub(crate) fn statement_executor(&self) -> Arc<StatementExecutor> {
pub fn statement_executor(&self) -> Arc<StatementExecutor> {
self.statement_executor.clone()
}
}
@@ -651,13 +649,9 @@ fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<(
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use api::v1::column::Values;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_recordbatch::RecordBatches;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use query::query_engine::options::QueryOptions;
@@ -665,9 +659,6 @@ mod tests {
use strfmt::Format;
use super::*;
use crate::table::DistTable;
use crate::tests;
use crate::tests::MockDistributedInstance;
#[test]
fn test_validate_insert_request() {
@@ -842,372 +833,4 @@ mod tests {
let sql = "DESC TABLE {catalog}{schema}demo;";
replace_test(sql, plugins, &query_ctx);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_exec_sql() {
let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await;
let instance = standalone.instance.as_ref();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
drop_table(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec_sql() {
let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
let instance = distributed.frontend.as_ref();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN ('MOSS'),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
verify_data_distribution(
&distributed,
HashMap::from([
(
0u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2013-12-31T16:00:00 | 490 |
+---------------------+------+",
),
(
1u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2022-12-31T16:00:00 | 550-A |
+---------------------+-------+",
),
(
2u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2023-12-31T16:00:00 | 550-W |
+---------------------+-------+",
),
(
3u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2043-12-31T16:00:00 | MOSS |
+---------------------+------+",
),
]),
)
.await;
drop_table(instance).await;
verify_table_is_dropped(&distributed).await;
}
async fn query(instance: &Instance, sql: &str) -> Output {
SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}
async fn create_table(instance: &Instance, sql: &str) {
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 0);
}
async fn insert_and_query(instance: &Instance) {
let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
('490', 0.1, 1, 1388505600000),
('550-A', 1, 100, 1672502400000),
('550-W', 10000, 1000000, 1704038400000),
('MOSS', 100000000, 10000000000, 2335190400000)
"#;
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 4);
let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
let output = query(instance, sql).await;
let Output::Stream(s) = output else { unreachable!() };
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
let pretty_print = batches.pretty_print().unwrap();
let expected = "\
+-------+---------------------+-------------+-----------+-----------+
| host | ts | cpu | memory | disk_util |
+-------+---------------------+-------------+-----------+-----------+
| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 1.0e10 | 9.9 |
+-------+---------------------+-------------+-----------+-----------+";
assert_eq!(pretty_print, expected);
}
async fn verify_data_distribution(
instance: &MockDistributedInstance,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", "demo")
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap();
for (region, dn) in region_to_dn_map.iter() {
let dn = instance.datanodes.get(dn).unwrap();
let engine = dn.query_engine();
let plan = engine
.planner()
.plan(stmt.clone(), QueryContext::arc())
.await
.unwrap();
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let actual = recordbatches.pretty_print().unwrap();
let expected = expected_distribution.get(region).unwrap();
assert_eq!(&actual, expected);
}
}
async fn drop_table(instance: &Instance) {
let sql = "DROP TABLE demo";
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 1);
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
for (_, dn) in instance.datanodes.iter() {
assert!(dn
.catalog_manager()
.table("greptime", "public", "demo")
.await
.unwrap()
.is_none())
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_interceptor_plugin() {
#[derive(Default)]
struct AssertionHook {
pub(crate) c: AtomicU32,
}
impl SqlQueryInterceptor for AssertionHook {
type Error = Error;
fn pre_parsing<'a>(
&self,
query: &'a str,
_query_ctx: QueryContextRef,
) -> Result<Cow<'a, str>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(query.starts_with("CREATE TABLE demo"));
Ok(Cow::Borrowed(query))
}
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(matches!(statements[0], Statement::CreateTable(_)));
Ok(statements)
}
fn pre_execute(
&self,
_statement: &Statement,
_plan: Option<&query::plan::LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn post_execute(
&self,
mut output: Output,
_query_ctx: QueryContextRef,
) -> Result<Output> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match &mut output {
Output::AffectedRows(rows) => {
assert_eq!(*rows, 0);
// update output result
*rows = 10;
}
_ => unreachable!(),
}
Ok(output)
}
}
let standalone = tests::create_standalone_instance("test_hook").await;
let mut instance = standalone.instance;
let plugins = Plugins::new();
let counter_hook = Arc::new(AssertionHook::default());
plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
// assert that the hook is called 3 times
assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 10),
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_disable_db_operation_plugin() {
#[derive(Default)]
struct DisableDBOpHook;
impl SqlQueryInterceptor for DisableDBOpHook {
type Error = Error;
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
for s in &statements {
match s {
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
return Err(Error::NotSupported {
feat: "Database operations".to_owned(),
})
}
_ => {}
}
}
Ok(statements)
}
}
let query_ctx = Arc::new(QueryContext::new());
let standalone = tests::create_standalone_instance("test_db_hook").await;
let mut instance = standalone.instance;
let plugins = Plugins::new();
let hook = Arc::new(DisableDBOpHook::default());
plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 0),
_ => unreachable!(),
}
let sql = r#"CREATE DATABASE tomcat"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
let sql = r#"SELECT 1; SHOW DATABASES"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
}
}

View File

@@ -73,14 +73,14 @@ use crate::table::DistTable;
const MAX_VALUE: &str = "MAXVALUE";
#[derive(Clone)]
pub(crate) struct DistInstance {
pub struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
}
impl DistInstance {
pub(crate) fn new(
pub fn new(
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
@@ -92,7 +92,7 @@ impl DistInstance {
}
}
pub(crate) async fn create_table(
pub async fn create_table(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
@@ -579,8 +579,7 @@ impl DistInstance {
Ok(Output::AffectedRows(affected_rows))
}
#[cfg(test)]
pub(crate) fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
pub fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
self.catalog_manager.clone()
}
}

View File

@@ -81,845 +81,3 @@ impl GrpcQueryHandler for Instance {
Ok(output)
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use api::v1::column::{SemanticType, Values};
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_catalog::consts::MITO_ENGINE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use query::parser::QueryLanguageParser;
use session::context::QueryContext;
use tests::{has_parquet_file, test_region_dir};
use super::*;
use crate::table::DistTable;
use crate::tests;
use crate::tests::MockDistributedInstance;
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
let instance =
tests::create_distributed_instance("test_distributed_handle_ddl_request").await;
let frontend = &instance.frontend;
test_handle_ddl_request(frontend.as_ref()).await;
verify_table_is_dropped(&instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_handle_ddl_request() {
let standalone =
tests::create_standalone_instance("test_standalone_handle_ddl_request").await;
let instance = &standalone.instance;
test_handle_ddl_request(instance.as_ref()).await;
}
async fn query(instance: &Instance, request: Request) -> Output {
GrpcQueryHandler::do_query(instance, request, QueryContext::arc())
.await
.unwrap()
}
async fn test_handle_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
engine: MITO_ENGINE.to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
}),
is_key: false,
}],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string()))
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-04T07:14:26 | s | 1 |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(DropTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
for (_, dn) in instance.datanodes.iter() {
assert!(dn
.catalog_manager()
.table(
"greptime",
"database_created_through_grpc",
"table_created_through_grpc"
)
.await
.unwrap()
.is_none());
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_insert_delete_and_query() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_insert_delete_and_query").await;
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
verify_data_distribution(
&instance,
table_name,
HashMap::from([
(
0u32,
"\
+---------------------+---+-------------------+
| ts | a | b |
+---------------------+---+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
+---------------------+---+-------------------+",
),
(
1u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
+---------------------+----+-------------------+",
),
(
2u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
+---------------------+----+-------------------+",
),
(
3u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+",
),
]),
)
.await;
test_insert_delete_and_query_on_auto_created_table(frontend).await;
// Auto created table has only one region.
verify_data_distribution(
&instance,
"auto_created_table",
HashMap::from([(
0u32,
"\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+",
)]),
)
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_insert_and_query() {
common_telemetry::init_default_ut_logging();
let standalone =
tests::create_standalone_instance("test_standalone_insert_and_query").await;
let instance = &standalone.instance;
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
test_insert_delete_and_query_on_auto_created_table(instance).await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_flush_table() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_distributed_instance("test_distributed_flush_table").await;
let data_tmp_dirs = instance.data_tmp_dirs();
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
flush_table(frontend, "greptime", "public", table_name, None).await;
// Wait for previous task finished
flush_table(frontend, "greptime", "public", table_name, None).await;
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let tgv = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let table_id = tgv.table_id();
let region_to_dn_map = tgv
.regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
for (region, dn) in region_to_dn_map.iter() {
// data_tmp_dirs -> dn: 1..4
let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap();
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
*region,
);
has_parquet_file(&region_dir);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_flush_table() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_flush_table").await;
let instance = &standalone.instance;
let data_tmp_dir = standalone.data_tmp_dir();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
let table_id = 1024;
let region_id = 0;
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
region_id,
);
assert!(!has_parquet_file(&region_dir));
flush_table(instance, "greptime", "public", "my_table", None).await;
// Wait for previous task finished
flush_table(instance, "greptime", "public", "my_table", None).await;
assert!(has_parquet_file(&region_dir));
}
async fn create_table(frontend: &Instance, sql: String) {
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn flush_table(
frontend: &Instance,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_id: Option<u32>,
) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(FlushTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_id,
})),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) {
let ts_millisecond_values = vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
1672557980000,
1672557981000,
1672557982000,
1672557983000,
1672557984000,
1672557985000,
1672557986000,
1672557987000,
];
let insert = InsertRequest {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53],
..Default::default()
}),
null_mask: vec![32, 0],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
},
Column {
column_name: "b".to_string(),
values: Some(Values {
string_values: ts_millisecond_values
.iter()
.map(|x| format!("ts: {x}"))
.collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 16,
..Default::default()
};
let output = query(instance, Request::Insert(insert)).await;
assert!(matches!(output, Output::AffectedRows(16)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(format!(
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: table_name.to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![2, 12, 22, 52],
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "b".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
string_values: vec![
"ts: 1672557973000".to_string(),
"ts: 1672557979000".to_string(),
"ts: 1672557982000".to_string(),
"ts: 1672557986000".to_string(),
],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
semantic_type: SemanticType::Timestamp as i32,
values: Some(Values {
ts_millisecond_values: vec![
1672557973000,
1672557979000,
1672557982000,
1672557986000,
],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 4,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(4)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
async fn verify_data_distribution(
instance: &MockDistributedInstance,
table_name: &str,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
for (region, dn) in region_to_dn_map.iter() {
let stmt = QueryLanguageParser::parse_sql(&format!(
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))
.unwrap();
let dn = instance.datanodes.get(dn).unwrap();
let engine = dn.query_engine();
let plan = engine
.planner()
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let actual = recordbatches.pretty_print().unwrap();
let expected = expected_distribution.get(region).unwrap();
assert_eq!(&actual, expected);
}
}
async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) {
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![4, 6],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
..Default::default()
};
// Test auto create not existed table upon insertion.
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
column_name: "b".to_string(),
values: Some(Values {
string_values: vec!["x".to_string(), "z".to_string()],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::String as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
..Default::default()
};
// Test auto add not existed column upon insertion.
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM auto_created_table".to_string(),
)),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:15 | 4 | |
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:19 | | |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: "auto_created_table".to_string(),
region_number: 0,
key_columns: vec![Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557975000, 1672557979000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
}],
row_count: 2,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_promql_query() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_promql_query").await;
let instance = &standalone.instance;
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))");
create_table(instance, sql).await;
let insert = InsertRequest {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "h".to_string(),
values: Some(Values {
string_values: vec![
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
],
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(Values {
f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64],
..Default::default()
}),
null_mask: vec![4],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 8,
..Default::default()
};
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(8)));
let request = Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery {
query: "my_table".to_owned(),
start: "1672557973".to_owned(),
end: "1672557978".to_owned(),
step: "1s".to_owned(),
})),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---+------+---------------------+
| h | a | ts |
+---+------+---------------------+
| t | 11.0 | 2023-01-01T07:26:13 |
| t | | 2023-01-01T07:26:14 |
| t | 20.0 | 2023-01-01T07:26:15 |
| t | 22.0 | 2023-01-01T07:26:16 |
| t | 50.0 | 2023-01-01T07:26:17 |
| t | 55.0 | 2023-01-01T07:26:18 |
+---+------+---------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
}

View File

@@ -36,65 +36,3 @@ impl InfluxdbLineProtocolHandler for Instance {
Ok(())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use super::*;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines() {
let standalone =
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
let instance = &standalone.instance;
test_put_influxdb_lines(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
let instance = &instance.frontend;
test_put_influxdb_lines(instance).await;
}
async fn test_put_influxdb_lines(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001";
let request = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
instance.exec(&request, QueryContext::arc()).await.unwrap();
let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400 | host2 | | 1027.0 |
+-------------------------+-------+------+--------+"
);
}
}

View File

@@ -35,95 +35,3 @@ impl OpentsdbProtocolHandler for Instance {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::RecordBatches;
use itertools::Itertools;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use super::*;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_exec() {
let standalone = tests::create_standalone_instance("test_standalone_exec").await;
let instance = &standalone.instance;
test_exec(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec() {
let distributed = tests::create_distributed_instance("test_distributed_exec").await;
let instance = &distributed.frontend;
test_exec(instance).await;
}
async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
1.0,
vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
let result = instance.exec(&data_point1, ctx.clone()).await;
assert!(result.is_ok());
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
2.0,
vec![
("tagk2".to_string(), "tagv2".to_string()),
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
let result = instance.exec(&data_point2, ctx.clone()).await;
assert!(result.is_ok());
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
let result = instance.exec(&data_point3, ctx.clone()).await;
assert!(result.is_ok());
let output = instance
.do_query(
"select * from my_metric_1 order by greptime_timestamp",
Arc::new(QueryContext::new()),
)
.await
.remove(0)
.unwrap();
match output {
Output::Stream(stream) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
]
.into_iter()
.join("\n");
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
};
}
}

View File

@@ -153,168 +153,3 @@ impl PrometheusProtocolHandler for Instance {
todo!();
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, LabelMatcher, Sample};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use super::*;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_prometheus_remote_rw() {
let standalone =
tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await;
let instance = &standalone.instance;
test_prometheus_remote_rw(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_prometheus_remote_rw() {
let distributed =
tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await;
let instance = &distributed.frontend;
test_prometheus_remote_rw(instance).await;
}
async fn test_prometheus_remote_rw(instance: &Arc<Instance>) {
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
..Default::default()
};
let db = "prometheus";
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE IF NOT EXISTS prometheus",
ctx.clone(),
)
.await
.get(0)
.unwrap()
.is_ok());
instance.write(write_request, ctx.clone()).await.unwrap();
let read_request = ReadRequest {
queries: vec![
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
..Default::default()
},
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 3000,
matchers: vec![
LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
r#type: 0,
},
LabelMatcher {
name: "app".to_string(),
value: "biz".to_string(),
r#type: MatcherType::Eq as i32,
},
],
..Default::default()
},
],
..Default::default()
};
let resp = instance.read(read_request, ctx).await.unwrap();
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.body).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
assert_eq!(1, query_results[0].timeseries.len());
let timeseries = &query_results[0].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "job".to_string(),
value: "spark".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 1.0,
timestamp: 1000,
},
Sample {
value: 2.0,
timestamp: 2000,
}
]
);
assert_eq!(1, query_results[1].timeseries.len());
let timeseries = &query_results[1].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 5.0,
timestamp: 1000,
},
Sample {
value: 6.0,
timestamp: 2000,
},
Sample {
value: 7.0,
timestamp: 3000,
}
]
);
}
}

View File

@@ -18,7 +18,7 @@
pub mod catalog;
pub mod datanode;
pub mod error;
mod expr_factory;
pub mod expr_factory;
pub mod frontend;
pub mod grpc;
pub mod influxdb;
@@ -32,11 +32,4 @@ pub mod prometheus;
mod script;
mod server;
pub mod statement;
mod table;
#[cfg(test)]
mod tests;
#[cfg(test)]
// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate
#[allow(clippy::single_component_path_imports)]
use rstest_reuse;
pub mod table;

View File

@@ -59,7 +59,7 @@ impl StatementExecutor {
}
}
pub(crate) async fn execute_stmt(
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,

View File

@@ -217,7 +217,7 @@ impl Table for DistTable {
}
impl DistTable {
pub(crate) fn new(
pub fn new(
table_name: TableName,
table_info: TableInfoRef,
partition_manager: PartitionRuleManagerRef,
@@ -233,7 +233,7 @@ impl DistTable {
}
}
pub(crate) async fn table_global_value(
pub async fn table_global_value(
&self,
key: &TableGlobalKey,
) -> Result<Option<TableGlobalValue>> {
@@ -496,25 +496,10 @@ mod test {
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use catalog::error::Result;
use catalog::remote::{KvBackend, ValueIter};
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::SessionContext;
use datafusion::sql::sqlparser;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator};
use datanode::instance::Instance;
use datatypes::arrow::compute::SortOptions;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use itertools::Itertools;
use meta_client::client::MetaClient;
use meta_client::rpc::router::RegionRoute;
use meta_client::rpc::{Region, Table, TableRoute};
@@ -528,15 +513,10 @@ mod test {
use partition::range::RangePartitionRule;
use partition::route::TableRoutes;
use partition::PartitionRuleRef;
use session::context::QueryContext;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::{meter_insert_request, TableRef};
use table::meter_insert_request;
use super::*;
use crate::expr_factory;
struct DummyKvBackend;
@@ -745,321 +725,6 @@ mod test {
assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table("test_dist_table_scan").await);
// should scan all regions
// select a, row_id from numbers
let projection = Some(vec![1, 2]);
let filters = vec![];
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"| 100 | 1 |",
"| 101 | 2 |",
"| 102 | 3 |",
"| 103 | 4 |",
"| 104 | 5 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
// should scan only region 1
// select a, row_id from numbers where a < 10
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()];
let expected_output = vec![
"+---+--------+",
"| a | row_id |",
"+---+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+---+--------+",
];
exec_table_scan(table.clone(), projection, filters, 1, expected_output).await;
// should scan region 1 and 2
// select a, row_id from numbers where a < 15
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()];
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan region 2 and 3
// select a, row_id from numbers where a < 40 and a >= 10
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(40)),
binary_expr(col("a"), Operator::GtEq, lit(10)),
)
.into()];
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan all regions
// select a, row_id from numbers where a < 1000 and row_id == 1
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(1000)),
binary_expr(col("row_id"), Operator::Eq, lit(1)),
)
.into()];
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 10 | 1 |",
"| 30 | 1 |",
"| 100 | 1 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
}
async fn exec_table_scan(
table: TableRef,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
expected_partitions: usize,
expected_output: Vec<&str>,
) {
let expected_output = expected_output.into_iter().join("\n");
let table_scan = table
.scan(projection.as_ref(), filters.as_slice(), None)
.await
.unwrap();
assert_eq!(
table_scan.output_partitioning().partition_count(),
expected_partitions
);
let merge =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone())));
let sort = SortExec::new(
vec![PhysicalSortExpr {
expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(),
options: SortOptions::default(),
}],
Arc::new(merge),
)
.with_fetch(None);
assert_eq!(sort.output_partitioning().partition_count(), 1);
let session_ctx = SessionContext::new();
let stream = sort.execute(0, session_ctx.task_ctx()).unwrap();
let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap());
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(recordbatches.pretty_print().unwrap(), expected_output);
}
async fn new_dist_table(test_name: &str) -> DistTable {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let instance = crate::tests::create_distributed_instance(test_name).await;
let dist_instance = &instance.dist_instance;
let datanode_instances = instance.datanodes;
let catalog_manager = dist_instance.catalog_manager();
let partition_manager = catalog_manager.partition_manager();
let datanode_clients = catalog_manager.datanode_clients();
let table_name = TableName::new("greptime", "public", "dist_numbers");
let sql = "
CREATE TABLE greptime.public.dist_numbers (
ts BIGINT,
a INT,
row_id INT,
TIME INDEX (ts),
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito";
let create_table =
match ParserContext::create_with_dialect(sql, &sqlparser::dialect::GenericDialect {})
.unwrap()
.pop()
.unwrap()
{
Statement::CreateTable(c) => c,
_ => unreachable!(),
};
let mut expr = expr_factory::create_to_expr(&create_table, QueryContext::arc()).unwrap();
let _result = dist_instance
.create_table(&mut expr, create_table.partitions)
.await
.unwrap();
let table_route = partition_manager
.find_table_route(&table_name)
.await
.unwrap();
let mut region_to_datanode_mapping = HashMap::new();
for region_route in table_route.region_routes.iter() {
let region_id = region_route.region.id as u32;
let datanode_id = region_route.leader_peer.as_ref().unwrap().id;
region_to_datanode_mapping.insert(region_id, datanode_id);
}
let mut global_start_ts = 1;
let regional_numbers = vec![
(0, (0..5).collect::<Vec<i32>>()),
(1, (10..15).collect::<Vec<i32>>()),
(2, (30..35).collect::<Vec<i32>>()),
(3, (100..105).collect::<Vec<i32>>()),
];
for (region_number, numbers) in regional_numbers {
let datanode_id = *region_to_datanode_mapping.get(&region_number).unwrap();
let instance = datanode_instances.get(&datanode_id).unwrap().clone();
let start_ts = global_start_ts;
global_start_ts += numbers.len() as i64;
insert_testing_data(
&table_name,
instance.clone(),
numbers,
start_ts,
region_number,
)
.await;
}
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
DistTable {
table_name,
table_info: Arc::new(table_info),
partition_manager,
datanode_clients,
backend: catalog_manager.backend(),
}
}
async fn insert_testing_data(
table_name: &TableName,
dn_instance: Arc<Instance>,
data: Vec<i32>,
start_ts: i64,
region_number: RegionNumber,
) {
let row_count = data.len() as u32;
let columns = vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: (start_ts..start_ts + row_count as i64).collect::<Vec<i64>>(),
..Default::default()
}),
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(column::Values {
i32_values: data,
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "row_id".to_string(),
values: Some(column::Values {
i32_values: (1..=row_count as i32).collect::<Vec<i32>>(),
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
];
let request = GrpcInsertRequest {
table_name: table_name.table_name.clone(),
columns,
row_count,
region_number,
};
dn_instance
.handle_insert(request, QueryContext::arc())
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new(

View File

@@ -23,7 +23,7 @@ common-test-util = { path = "../src/common/test-util" }
datanode = { path = "../src/datanode" }
datatypes = { path = "../src/datatypes" }
dotenv = "0.15"
frontend = { path = "../src/frontend" }
frontend = { path = "../src/frontend", features = ["testing"] }
mito = { path = "../src/mito", features = ["test"] }
object-store = { path = "../src/object-store" }
once_cell = "1.16"
@@ -40,4 +40,22 @@ tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
common-base = { path = "../src/common/base" }
common-recordbatch = { path = "../src/common/recordbatch" }
datafusion.workspace = true
datafusion-expr.workspace = true
futures.workspace = true
itertools = "0.10"
meta-client = { path = "../src/meta-client" }
meta-srv = { path = "../src/meta-srv" }
partition = { path = "../src/partition" }
paste.workspace = true
prost.workspace = true
query = { path = "../src/query" }
script = { path = "../src/script" }
session = { path = "../src/session" }
store-api = { path = "../src/store-api" }
tonic.workspace = true
tower = "0.4"
rstest = "0.17"
rstest_reuse = "0.5"

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use catalog::{CatalogManager, RegisterSystemTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME};
use table::requests::{CreateTableRequest, TableOptions};
#[tokio::test(flavor = "multi_thread")]
async fn test_register_system_table() {
let instance =
crate::tests::create_distributed_instance("test_register_system_table").await;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = DEFAULT_SCHEMA_NAME;
let table_name = SCRIPTS_TABLE_NAME;
let request = CreateTableRequest {
id: 1,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: Some("Scripts table".to_string()),
schema: build_scripts_schema(),
region_numbers: vec![0],
primary_key_indices: vec![0, 1],
create_if_not_exists: true,
table_options: TableOptions::default(),
engine: MITO_ENGINE.to_string(),
};
let result = instance
.catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: None,
})
.await;
assert!(result.is_ok());
assert!(
instance
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some(),
"the registered system table cannot be found in catalog"
);
let mut actually_created_table_in_datanode = 0;
for datanode in instance.datanodes.values() {
if datanode
.catalog_manager()
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some()
{
actually_created_table_in_datanode += 1;
}
}
assert_eq!(
actually_created_table_in_datanode, 1,
"system table should be actually created at one and only one datanode"
)
}
}

View File

@@ -0,0 +1,858 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod test {
use std::collections::HashMap;
use api::v1::column::{SemanticType, Values};
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_catalog::consts::MITO_ENGINE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use frontend::table::DistTable;
use query::parser::QueryLanguageParser;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use tests::{has_parquet_file, test_region_dir};
use crate::tests;
use crate::tests::MockDistributedInstance;
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
let instance =
tests::create_distributed_instance("test_distributed_handle_ddl_request").await;
let frontend = &instance.frontend;
test_handle_ddl_request(frontend.as_ref()).await;
verify_table_is_dropped(&instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_handle_ddl_request() {
let standalone =
tests::create_standalone_instance("test_standalone_handle_ddl_request").await;
let instance = &standalone.instance;
test_handle_ddl_request(instance.as_ref()).await;
}
async fn query(instance: &Instance, request: Request) -> Output {
GrpcQueryHandler::do_query(instance, request, QueryContext::arc())
.await
.unwrap()
}
async fn test_handle_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
engine: MITO_ENGINE.to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
}),
is_key: false,
}],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string()))
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-04T07:14:26 | s | 1 |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(DropTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
})),
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(1)));
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
for (_, dn) in instance.datanodes.iter() {
assert!(dn
.catalog_manager()
.table(
"greptime",
"database_created_through_grpc",
"table_created_through_grpc"
)
.await
.unwrap()
.is_none());
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_insert_delete_and_query() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_insert_delete_and_query").await;
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
verify_data_distribution(
&instance,
table_name,
HashMap::from([
(
0u32,
"\
+---------------------+---+-------------------+
| ts | a | b |
+---------------------+---+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
+---------------------+---+-------------------+",
),
(
1u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
+---------------------+----+-------------------+",
),
(
2u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
+---------------------+----+-------------------+",
),
(
3u32,
"\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+",
),
]),
)
.await;
test_insert_delete_and_query_on_auto_created_table(frontend).await;
// Auto created table has only one region.
verify_data_distribution(
&instance,
"auto_created_table",
HashMap::from([(
0u32,
"\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+",
)]),
)
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_insert_and_query() {
common_telemetry::init_default_ut_logging();
let standalone =
tests::create_standalone_instance("test_standalone_insert_and_query").await;
let instance = &standalone.instance;
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
test_insert_delete_and_query_on_auto_created_table(instance).await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_flush_table() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_distributed_instance("test_distributed_flush_table").await;
let data_tmp_dirs = instance.data_tmp_dirs();
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
flush_table(frontend, "greptime", "public", table_name, None).await;
// Wait for previous task finished
flush_table(frontend, "greptime", "public", table_name, None).await;
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let tgv = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let table_id = tgv.table_id();
let region_to_dn_map = tgv
.regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
for (region, dn) in region_to_dn_map.iter() {
// data_tmp_dirs -> dn: 1..4
let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap();
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
*region,
);
has_parquet_file(&region_dir);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_flush_table() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_flush_table").await;
let instance = &standalone.instance;
let data_tmp_dir = standalone.data_tmp_dir();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
let table_id = 1024;
let region_id = 0;
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
region_id,
);
assert!(!has_parquet_file(&region_dir));
flush_table(instance, "greptime", "public", "my_table", None).await;
// Wait for previous task finished
flush_table(instance, "greptime", "public", "my_table", None).await;
assert!(has_parquet_file(&region_dir));
}
async fn create_table(frontend: &Instance, sql: String) {
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn flush_table(
frontend: &Instance,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_id: Option<u32>,
) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(FlushTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_id,
})),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) {
let ts_millisecond_values = vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
1672557980000,
1672557981000,
1672557982000,
1672557983000,
1672557984000,
1672557985000,
1672557986000,
1672557987000,
];
let insert = InsertRequest {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53],
..Default::default()
}),
null_mask: vec![32, 0],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
},
Column {
column_name: "b".to_string(),
values: Some(Values {
string_values: ts_millisecond_values
.iter()
.map(|x| format!("ts: {x}"))
.collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 16,
..Default::default()
};
let output = query(instance, Request::Insert(insert)).await;
assert!(matches!(output, Output::AffectedRows(16)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(format!(
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: table_name.to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![2, 12, 22, 52],
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "b".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
string_values: vec![
"ts: 1672557973000".to_string(),
"ts: 1672557979000".to_string(),
"ts: 1672557982000".to_string(),
"ts: 1672557986000".to_string(),
],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
semantic_type: SemanticType::Timestamp as i32,
values: Some(Values {
ts_millisecond_values: vec![
1672557973000,
1672557979000,
1672557982000,
1672557986000,
],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 4,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(4)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
async fn verify_data_distribution(
instance: &MockDistributedInstance,
table_name: &str,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
for (region, dn) in region_to_dn_map.iter() {
let stmt = QueryLanguageParser::parse_sql(&format!(
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))
.unwrap();
let dn = instance.datanodes.get(dn).unwrap();
let engine = dn.query_engine();
let plan = engine
.planner()
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let actual = recordbatches.pretty_print().unwrap();
let expected = expected_distribution.get(region).unwrap();
assert_eq!(&actual, expected);
}
}
async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) {
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![4, 6],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
..Default::default()
};
// Test auto create not existed table upon insertion.
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
column_name: "b".to_string(),
values: Some(Values {
string_values: vec!["x".to_string(), "z".to_string()],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::String as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
..Default::default()
};
// Test auto add not existed column upon insertion.
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM auto_created_table".to_string(),
)),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:15 | 4 | |
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:19 | | |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: "auto_created_table".to_string(),
region_number: 0,
key_columns: vec![Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557975000, 1672557979000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
}],
row_count: 2,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_promql_query() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_promql_query").await;
let instance = &standalone.instance;
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))");
create_table(instance, sql).await;
let insert = InsertRequest {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "h".to_string(),
values: Some(Values {
string_values: vec![
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
"t".to_string(),
],
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(Values {
f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64],
..Default::default()
}),
null_mask: vec![4],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 8,
..Default::default()
};
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(8)));
let request = Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery {
query: "my_table".to_owned(),
start: "1672557973".to_owned(),
end: "1672557978".to_owned(),
step: "1s".to_owned(),
})),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---+------+---------------------+
| h | a | ts |
+---+------+---------------------+
| t | 11.0 | 2023-01-01T07:26:13 |
| t | | 2023-01-01T07:26:14 |
| t | 20.0 | 2023-01-01T07:26:15 |
| t | 22.0 | 2023-01-01T07:26:16 |
| t | 50.0 | 2023-01-01T07:26:17 |
| t | 55.0 | 2023-01-01T07:26:18 |
+---+------+---------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
}

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContext;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines() {
let standalone =
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
let instance = &standalone.instance;
test_put_influxdb_lines(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
let instance = &instance.frontend;
test_put_influxdb_lines(instance).await;
}
async fn test_put_influxdb_lines(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001";
let request = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
instance.exec(&request, QueryContext::arc()).await.unwrap();
let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400 | host2 | | 1027.0 |
+-------------------------+-------+------+--------+"
);
}
}

View File

@@ -0,0 +1,405 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_base::Plugins;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::error::{self, Error, Result};
use frontend::instance::Instance;
use frontend::table::DistTable;
use query::parser::QueryLanguageParser;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use sql::statements::statement::Statement;
use crate::tests;
use crate::tests::MockDistributedInstance;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_exec_sql() {
let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await;
let instance = standalone.instance.as_ref();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
drop_table(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec_sql() {
let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
let instance = distributed.frontend.as_ref();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN ('MOSS'),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
verify_data_distribution(
&distributed,
HashMap::from([
(
0u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2013-12-31T16:00:00 | 490 |
+---------------------+------+",
),
(
1u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2022-12-31T16:00:00 | 550-A |
+---------------------+-------+",
),
(
2u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2023-12-31T16:00:00 | 550-W |
+---------------------+-------+",
),
(
3u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2043-12-31T16:00:00 | MOSS |
+---------------------+------+",
),
]),
)
.await;
drop_table(instance).await;
verify_table_is_dropped(&distributed).await;
}
async fn query(instance: &Instance, sql: &str) -> Output {
SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}
async fn create_table(instance: &Instance, sql: &str) {
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 0);
}
async fn insert_and_query(instance: &Instance) {
let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
('490', 0.1, 1, 1388505600000),
('550-A', 1, 100, 1672502400000),
('550-W', 10000, 1000000, 1704038400000),
('MOSS', 100000000, 10000000000, 2335190400000)
"#;
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 4);
let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
let output = query(instance, sql).await;
let Output::Stream(s) = output else { unreachable!() };
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
let pretty_print = batches.pretty_print().unwrap();
let expected = "\
+-------+---------------------+-------------+-----------+-----------+
| host | ts | cpu | memory | disk_util |
+-------+---------------------+-------------+-----------+-----------+
| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 1.0e10 | 9.9 |
+-------+---------------------+-------------+-----------+-----------+";
assert_eq!(pretty_print, expected);
}
async fn verify_data_distribution(
instance: &MockDistributedInstance,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", "demo")
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
assert_eq!(region_to_dn_map.len(), expected_distribution.len());
let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap();
for (region, dn) in region_to_dn_map.iter() {
let dn = instance.datanodes.get(dn).unwrap();
let engine = dn.query_engine();
let plan = engine
.planner()
.plan(stmt.clone(), QueryContext::arc())
.await
.unwrap();
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let actual = recordbatches.pretty_print().unwrap();
let expected = expected_distribution.get(region).unwrap();
assert_eq!(&actual, expected);
}
}
async fn drop_table(instance: &Instance) {
let sql = "DROP TABLE demo";
let output = query(instance, sql).await;
let Output::AffectedRows(x) = output else { unreachable!() };
assert_eq!(x, 1);
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
for (_, dn) in instance.datanodes.iter() {
assert!(dn
.catalog_manager()
.table("greptime", "public", "demo")
.await
.unwrap()
.is_none())
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_interceptor_plugin() {
#[derive(Default)]
struct AssertionHook {
pub(crate) c: AtomicU32,
}
impl SqlQueryInterceptor for AssertionHook {
type Error = Error;
fn pre_parsing<'a>(
&self,
query: &'a str,
_query_ctx: QueryContextRef,
) -> Result<Cow<'a, str>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(query.starts_with("CREATE TABLE demo"));
Ok(Cow::Borrowed(query))
}
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(matches!(statements[0], Statement::CreateTable(_)));
Ok(statements)
}
fn pre_execute(
&self,
_statement: &Statement,
_plan: Option<&query::plan::LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn post_execute(
&self,
mut output: Output,
_query_ctx: QueryContextRef,
) -> Result<Output> {
self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match &mut output {
Output::AffectedRows(rows) => {
assert_eq!(*rows, 0);
// update output result
*rows = 10;
}
_ => unreachable!(),
}
Ok(output)
}
}
let standalone = tests::create_standalone_instance("test_hook").await;
let mut instance = standalone.instance;
let plugins = Plugins::new();
let counter_hook = Arc::new(AssertionHook::default());
plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
// assert that the hook is called 3 times
assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 10),
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_disable_db_operation_plugin() {
#[derive(Default)]
struct DisableDBOpHook;
impl SqlQueryInterceptor for DisableDBOpHook {
type Error = Error;
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
for s in &statements {
match s {
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
return Err(Error::NotSupported {
feat: "Database operations".to_owned(),
})
}
_ => {}
}
}
Ok(statements)
}
}
let query_ctx = Arc::new(QueryContext::new());
let standalone = tests::create_standalone_instance("test_db_hook").await;
let mut instance = standalone.instance;
let plugins = Plugins::new();
let hook = Arc::new(DisableDBOpHook::default());
plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins));
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 0),
_ => unreachable!(),
}
let sql = r#"CREATE DATABASE tomcat"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
let sql = r#"SELECT 1; SHOW DATABASES"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
}
}

View File

@@ -12,4 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod catalog;
mod grpc;
mod influxdb;
mod instance;
mod opentsdb;
mod prometheus;
mod table;
pub mod test_util;
#[cfg(test)]
mod tests;
#[cfg(test)]
// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate
#[allow(clippy::single_component_path_imports)]
use rstest_reuse;

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use itertools::Itertools;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContext;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_exec() {
let standalone = tests::create_standalone_instance("test_standalone_exec").await;
let instance = &standalone.instance;
test_exec(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec() {
let distributed = tests::create_distributed_instance("test_distributed_exec").await;
let instance = &distributed.frontend;
test_exec(instance).await;
}
async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
1.0,
vec![
("tagk1".to_string(), "tagv1".to_string()),
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
let result = instance.exec(&data_point1, ctx.clone()).await;
assert!(result.is_ok());
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
2.0,
vec![
("tagk2".to_string(), "tagv2".to_string()),
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
let result = instance.exec(&data_point2, ctx.clone()).await;
assert!(result.is_ok());
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
let result = instance.exec(&data_point3, ctx.clone()).await;
assert!(result.is_ok());
let output = instance
.do_query(
"select * from my_metric_1 order by greptime_timestamp",
Arc::new(QueryContext::new()),
)
.await
.remove(0)
.unwrap();
match output {
Output::Stream(stream) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
]
.into_iter()
.join("\n");
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
};
}
}

View File

@@ -0,0 +1,183 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use frontend::instance::Instance;
use prost::Message;
use servers::prometheus;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::PrometheusProtocolHandler;
use session::context::QueryContext;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_prometheus_remote_rw() {
let standalone =
tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await;
let instance = &standalone.instance;
test_prometheus_remote_rw(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_prometheus_remote_rw() {
let distributed =
tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await;
let instance = &distributed.frontend;
test_prometheus_remote_rw(instance).await;
}
async fn test_prometheus_remote_rw(instance: &Arc<Instance>) {
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
..Default::default()
};
let db = "prometheus";
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE IF NOT EXISTS prometheus",
ctx.clone(),
)
.await
.get(0)
.unwrap()
.is_ok());
instance.write(write_request, ctx.clone()).await.unwrap();
let read_request = ReadRequest {
queries: vec![
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
..Default::default()
},
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 3000,
matchers: vec![
LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
r#type: 0,
},
LabelMatcher {
name: "app".to_string(),
value: "biz".to_string(),
r#type: MatcherType::Eq as i32,
},
],
..Default::default()
},
],
..Default::default()
};
let resp = instance.read(read_request, ctx).await.unwrap();
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.body).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
assert_eq!(1, query_results[0].timeseries.len());
let timeseries = &query_results[0].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "job".to_string(),
value: "spark".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 1.0,
timestamp: 1000,
},
Sample {
value: 2.0,
timestamp: 2000,
}
]
);
assert_eq!(1, query_results[1].timeseries.len());
let timeseries = &query_results[1].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 5.0,
timestamp: 1000,
},
Sample {
value: 6.0,
timestamp: 2000,
},
Sample {
value: 7.0,
timestamp: 3000,
}
]
);
}
}

View File

@@ -0,0 +1,363 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::SessionContext;
use datafusion::sql::sqlparser;
use datafusion_expr::expr_fn::{and, binary_expr, col};
use datafusion_expr::{lit, Operator};
use datanode::instance::Instance;
use datatypes::arrow::compute::SortOptions;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use frontend::expr_factory;
use frontend::table::DistTable;
use itertools::Itertools;
use meta_client::rpc::TableName;
use session::context::QueryContext;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table("test_dist_table_scan").await);
// should scan all regions
// select a, row_id from numbers
let projection = Some(vec![1, 2]);
let filters = vec![];
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"| 100 | 1 |",
"| 101 | 2 |",
"| 102 | 3 |",
"| 103 | 4 |",
"| 104 | 5 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
// should scan only region 1
// select a, row_id from numbers where a < 10
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()];
let expected_output = vec![
"+---+--------+",
"| a | row_id |",
"+---+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+---+--------+",
];
exec_table_scan(table.clone(), projection, filters, 1, expected_output).await;
// should scan region 1 and 2
// select a, row_id from numbers where a < 15
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()];
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan region 2 and 3
// select a, row_id from numbers where a < 40 and a >= 10
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(40)),
binary_expr(col("a"), Operator::GtEq, lit(10)),
)
.into()];
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan all regions
// select a, row_id from numbers where a < 1000 and row_id == 1
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(1000)),
binary_expr(col("row_id"), Operator::Eq, lit(1)),
)
.into()];
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 10 | 1 |",
"| 30 | 1 |",
"| 100 | 1 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
}
async fn exec_table_scan(
table: TableRef,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
expected_partitions: usize,
expected_output: Vec<&str>,
) {
let expected_output = expected_output.into_iter().join("\n");
let table_scan = table
.scan(projection.as_ref(), filters.as_slice(), None)
.await
.unwrap();
assert_eq!(
table_scan.output_partitioning().partition_count(),
expected_partitions
);
let merge =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone())));
let sort = SortExec::new(
vec![PhysicalSortExpr {
expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(),
options: SortOptions::default(),
}],
Arc::new(merge),
)
.with_fetch(None);
assert_eq!(sort.output_partitioning().partition_count(), 1);
let session_ctx = SessionContext::new();
let stream = sort.execute(0, session_ctx.task_ctx()).unwrap();
let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap());
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(recordbatches.pretty_print().unwrap(), expected_output);
}
async fn new_dist_table(test_name: &str) -> DistTable {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let instance = crate::tests::create_distributed_instance(test_name).await;
let dist_instance = &instance.dist_instance;
let datanode_instances = instance.datanodes;
let catalog_manager = dist_instance.catalog_manager();
let partition_manager = catalog_manager.partition_manager();
let datanode_clients = catalog_manager.datanode_clients();
let table_name = TableName::new("greptime", "public", "dist_numbers");
let sql = "
CREATE TABLE greptime.public.dist_numbers (
ts BIGINT,
a INT,
row_id INT,
TIME INDEX (ts),
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito";
let create_table =
match ParserContext::create_with_dialect(sql, &sqlparser::dialect::GenericDialect {})
.unwrap()
.pop()
.unwrap()
{
Statement::CreateTable(c) => c,
_ => unreachable!(),
};
let mut expr = expr_factory::create_to_expr(&create_table, QueryContext::arc()).unwrap();
let _result = dist_instance
.create_table(&mut expr, create_table.partitions)
.await
.unwrap();
let table_route = partition_manager
.find_table_route(&table_name)
.await
.unwrap();
let mut region_to_datanode_mapping = HashMap::new();
for region_route in table_route.region_routes.iter() {
let region_id = region_route.region.id as u32;
let datanode_id = region_route.leader_peer.as_ref().unwrap().id;
region_to_datanode_mapping.insert(region_id, datanode_id);
}
let mut global_start_ts = 1;
let regional_numbers = vec![
(0, (0..5).collect::<Vec<i32>>()),
(1, (10..15).collect::<Vec<i32>>()),
(2, (30..35).collect::<Vec<i32>>()),
(3, (100..105).collect::<Vec<i32>>()),
];
for (region_number, numbers) in regional_numbers {
let datanode_id = *region_to_datanode_mapping.get(&region_number).unwrap();
let instance = datanode_instances.get(&datanode_id).unwrap().clone();
let start_ts = global_start_ts;
global_start_ts += numbers.len() as i64;
insert_testing_data(
&table_name,
instance.clone(),
numbers,
start_ts,
region_number,
)
.await;
}
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
DistTable::new(
table_name,
Arc::new(table_info),
partition_manager,
datanode_clients,
catalog_manager.backend(),
)
}
async fn insert_testing_data(
table_name: &TableName,
dn_instance: Arc<Instance>,
data: Vec<i32>,
start_ts: i64,
region_number: RegionNumber,
) {
let row_count = data.len() as u32;
let columns = vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: (start_ts..start_ts + row_count as i64).collect::<Vec<i64>>(),
..Default::default()
}),
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(column::Values {
i32_values: data,
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "row_id".to_string(),
values: Some(column::Values {
i32_values: (1..=row_count as i32).collect::<Vec<i32>>(),
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
];
let request = GrpcInsertRequest {
table_name: table_name.table_name.clone(),
columns,
row_count,
region_number,
};
dn_instance
.handle_insert(request, QueryContext::arc())
.await
.unwrap();
}
}

View File

@@ -30,6 +30,10 @@ use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
};
use datanode::instance::Instance as DatanodeInstance;
use frontend::catalog::FrontendCatalogManager;
use frontend::datanode::DatanodeClients;
use frontend::instance::distributed::DistInstance;
use frontend::instance::Instance;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::Peer;
use meta_srv::metasrv::MetaSrvOptions;
@@ -45,11 +49,6 @@ use table::engine::{region_name, table_dir};
use tonic::transport::Server;
use tower::service_fn;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::instance::distributed::DistInstance;
use crate::instance::Instance;
/// Guard against the `TempDir`s that used in unit tests.
/// (The `TempDir` will be deleted once it goes out of scope.)
pub struct TestGuard {

View File

@@ -20,13 +20,13 @@ use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use frontend::error::{Error, Result};
use frontend::instance::Instance;
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use crate::error::{Error, Result};
use crate::instance::Instance;
use crate::tests::test_util::{
both_instances_cases, check_output_stream, check_unordered_output_stream, distributed,
get_data_dir, standalone, standalone_instance_case, MockInstance,
@@ -516,7 +516,7 @@ async fn test_execute_external_create_without_ts_type(instance: Arc<dyn MockInst
async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "parquet";
let location = get_data_dir("../../tests/data/parquet/various_type.parquet")
let location = get_data_dir("../tests/data/parquet/various_type.parquet")
.canonicalize()
.unwrap()
.display()
@@ -586,7 +586,7 @@ async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstanc
async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "csv";
let location = get_data_dir("../../tests/data/csv/various_type.csv")
let location = get_data_dir("../tests/data/csv/various_type.csv")
.canonicalize()
.unwrap()
.display()
@@ -637,7 +637,7 @@ async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>)
async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "json";
let location = get_data_dir("../../tests/data/json/various_type.json")
let location = get_data_dir("../tests/data/json/various_type.json")
.canonicalize()
.unwrap()
.display()
@@ -694,7 +694,7 @@ async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>)
async fn test_execute_query_external_table_json_with_schame(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "json";
let location = get_data_dir("../../tests/data/json/various_type.json")
let location = get_data_dir("../tests/data/json/various_type.json")
.canonicalize()
.unwrap()
.display()

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use frontend::instance::Instance;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use rstest::rstest;
use rstest_reuse::apply;
@@ -22,7 +23,6 @@ use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use super::test_util::{check_unordered_output_stream, standalone, standalone_instance_case};
use crate::instance::Instance;
use crate::tests::test_util::MockInstance;
#[allow(clippy::too_many_arguments)]

View File

@@ -17,9 +17,9 @@ use std::sync::Arc;
use common_query::Output;
use common_recordbatch::util;
use frontend::instance::Instance;
use rstest_reuse::{self, template};
use crate::instance::Instance;
use crate::tests::{
create_distributed_instance, create_standalone_instance, MockDistributedInstance,
MockStandaloneInstance,