mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
refactor: add tests-integration module (#590)
* refactor: add integration-tests module * Apply suggestions from code review Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * test: move grpc module to tests-integration * test: adapt new standalone mode * test: improve http assertion Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
27
tests-integration/Cargo.toml
Normal file
27
tests-integration/Cargo.toml
Normal file
@@ -0,0 +1,27 @@
|
||||
[package]
|
||||
name = "tests-integration"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../src/api" }
|
||||
axum = "0.6.0-rc.2"
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
catalog = { path = "../src/catalog" }
|
||||
client = { path = "../src/client" }
|
||||
common-catalog = { path = "../src/common/catalog" }
|
||||
common-runtime = { path = "../src/common/runtime" }
|
||||
common-telemetry = { path = "../src/common/telemetry" }
|
||||
datanode = { path = "../src/datanode" }
|
||||
datatypes = { path = "../src/datatypes" }
|
||||
frontend = { path = "../src/frontend" }
|
||||
mito = { path = "../src/mito", features = ["test"] }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
servers = { path = "../src/servers" }
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
sql = { path = "../src/sql" }
|
||||
table = { path = "../src/table" }
|
||||
tempdir = "0.3"
|
||||
tokio = { version = "1.20", features = ["full"] }
|
||||
15
tests-integration/src/lib.rs
Normal file
15
tests-integration/src/lib.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod test_util;
|
||||
107
tests-integration/src/test_util.rs
Normal file
107
tests-integration/src/test_util.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::sql::SqlHandler;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::CreateTableRequest;
|
||||
use tempdir::TempDir;
|
||||
|
||||
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
|
||||
/// Only for test.
|
||||
pub struct TestGuard {
|
||||
_wal_tmp_dir: TempDir,
|
||||
_data_tmp_dir: TempDir,
|
||||
}
|
||||
|
||||
pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
|
||||
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{}", name)).unwrap();
|
||||
let data_tmp_dir = TempDir::new(&format!("gt_data_{}", name)).unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
mode: Mode::Standalone,
|
||||
..Default::default()
|
||||
};
|
||||
(
|
||||
opts,
|
||||
TestGuard {
|
||||
_wal_tmp_dir: wal_tmp_dir,
|
||||
_data_tmp_dir: data_tmp_dir,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn create_test_table(
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
sql_handler: &SqlHandler,
|
||||
ts_type: ConcreteDataType,
|
||||
) -> Result<()> {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
|
||||
];
|
||||
|
||||
let table_name = "demo";
|
||||
let table_engine: TableEngineRef = sql_handler.table_engine();
|
||||
let table = table_engine
|
||||
.create_table(
|
||||
&EngineContext::default(),
|
||||
CreateTableRequest {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some(" a test table".to_string()),
|
||||
schema: Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.build()
|
||||
.expect("ts is expected to be timestamp column"),
|
||||
),
|
||||
create_if_not_exists: true,
|
||||
primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys
|
||||
table_options: HashMap::new(),
|
||||
region_numbers: vec![0],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context(CreateTableSnafu { table_name })?;
|
||||
|
||||
let schema_provider = catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
schema_provider
|
||||
.register_table(table_name.to_string(), table)
|
||||
.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
314
tests-integration/tests/grpc.rs
Normal file
314
tests-integration/tests/grpc.rs
Normal file
@@ -0,0 +1,314 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#![feature(assert_matches)]
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::column::SemanticType;
|
||||
use api::v1::{
|
||||
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
|
||||
CreateExpr, InsertExpr, MutateResult,
|
||||
};
|
||||
use client::admin::Admin;
|
||||
use client::{Client, Database, ObjectResult};
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use datanode::instance::Instance;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::grpc::GrpcOptions;
|
||||
use servers::grpc::GrpcServer;
|
||||
use servers::server::Server;
|
||||
use servers::Mode;
|
||||
use tests_integration::test_util::{self, TestGuard};
|
||||
|
||||
async fn setup_grpc_server(
|
||||
name: &str,
|
||||
datanode_port: usize,
|
||||
frontend_port: usize,
|
||||
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
|
||||
let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port);
|
||||
opts.rpc_addr = datanode_grpc_addr.clone();
|
||||
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
|
||||
instance.start().await.unwrap();
|
||||
|
||||
let datanode_grpc_addr = datanode_grpc_addr.clone();
|
||||
let runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
.worker_threads(2)
|
||||
.thread_name("grpc-handlers")
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
|
||||
let fe_opts = FrontendOptions {
|
||||
mode: Mode::Standalone,
|
||||
datanode_rpc_addr: datanode_grpc_addr.clone(),
|
||||
grpc_options: Some(GrpcOptions {
|
||||
addr: fe_grpc_addr.clone(),
|
||||
runtime_size: 8,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let datanode_grpc_server = Arc::new(GrpcServer::new(
|
||||
instance.clone(),
|
||||
instance.clone(),
|
||||
runtime.clone(),
|
||||
));
|
||||
|
||||
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
|
||||
.await
|
||||
.unwrap();
|
||||
fe_instance.set_catalog_manager(instance.catalog_manager().clone());
|
||||
|
||||
let fe_instance_ref = Arc::new(fe_instance);
|
||||
let fe_grpc_server = Arc::new(GrpcServer::new(
|
||||
fe_instance_ref.clone(),
|
||||
fe_instance_ref,
|
||||
runtime,
|
||||
));
|
||||
let grpc_server_clone = fe_grpc_server.clone();
|
||||
|
||||
let fe_grpc_addr_clone = fe_grpc_addr.clone();
|
||||
tokio::spawn(async move {
|
||||
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
|
||||
grpc_server_clone.start(addr).await.unwrap()
|
||||
});
|
||||
|
||||
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
|
||||
let dn_grpc_server_clone = datanode_grpc_server.clone();
|
||||
tokio::spawn(async move {
|
||||
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
|
||||
dn_grpc_server_clone.start(addr).await.unwrap()
|
||||
});
|
||||
|
||||
// wait for GRPC server to start
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_auto_create_table() {
|
||||
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
|
||||
setup_grpc_server("auto_create_table", 3992, 3993).await;
|
||||
|
||||
let grpc_client = Client::with_urls(vec![addr]);
|
||||
let db = Database::new("greptime", grpc_client);
|
||||
insert_and_assert(&db).await;
|
||||
let _ = fe_grpc_server.shutdown().await;
|
||||
let _ = dn_grpc_server.shutdown().await;
|
||||
}
|
||||
|
||||
fn expect_data() -> (Column, Column, Column, Column) {
|
||||
// testing data:
|
||||
let expected_host_col = Column {
|
||||
column_name: "host".to_string(),
|
||||
values: Some(column::Values {
|
||||
string_values: vec!["host1", "host2", "host3", "host4"]
|
||||
.into_iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype: ColumnDataType::String as i32,
|
||||
..Default::default()
|
||||
};
|
||||
let expected_cpu_col = Column {
|
||||
column_name: "cpu".to_string(),
|
||||
values: Some(column::Values {
|
||||
f64_values: vec![0.31, 0.41, 0.2],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![2],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
};
|
||||
let expected_mem_col = Column {
|
||||
column_name: "memory".to_string(),
|
||||
values: Some(column::Values {
|
||||
f64_values: vec![0.1, 0.2, 0.3],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![4],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
};
|
||||
let expected_ts_col = Column {
|
||||
column_name: "ts".to_string(),
|
||||
values: Some(column::Values {
|
||||
ts_millis_values: vec![100, 101, 102, 103],
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
datatype: ColumnDataType::Timestamp as i32,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
(
|
||||
expected_host_col,
|
||||
expected_cpu_col,
|
||||
expected_mem_col,
|
||||
expected_ts_col,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_insert_and_select() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
|
||||
setup_grpc_server("insert_and_select", 3990, 3991).await;
|
||||
|
||||
let grpc_client = Client::with_urls(vec![addr]);
|
||||
|
||||
let db = Database::new("greptime", grpc_client.clone());
|
||||
let admin = Admin::new("greptime", grpc_client);
|
||||
|
||||
// create
|
||||
let expr = testing_create_expr();
|
||||
let result = admin.create(expr).await.unwrap();
|
||||
assert_matches!(
|
||||
result.result,
|
||||
Some(admin_result::Result::Mutate(MutateResult {
|
||||
success: 1,
|
||||
failure: 0
|
||||
}))
|
||||
);
|
||||
|
||||
//alter
|
||||
let add_column = ColumnDef {
|
||||
name: "test_column".to_string(),
|
||||
datatype: ColumnDataType::Int64.into(),
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
};
|
||||
let kind = Kind::AddColumns(AddColumns {
|
||||
add_columns: vec![AddColumn {
|
||||
column_def: Some(add_column),
|
||||
is_key: false,
|
||||
}],
|
||||
});
|
||||
let expr = AlterExpr {
|
||||
table_name: "test_table".to_string(),
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
kind: Some(kind),
|
||||
};
|
||||
let result = admin.alter(expr).await.unwrap();
|
||||
assert_eq!(result.result, None);
|
||||
|
||||
// insert
|
||||
insert_and_assert(&db).await;
|
||||
|
||||
let _ = fe_grpc_server.shutdown().await;
|
||||
let _ = dn_grpc_server.shutdown().await;
|
||||
}
|
||||
|
||||
async fn insert_and_assert(db: &Database) {
|
||||
// testing data:
|
||||
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
|
||||
|
||||
let expr = InsertExpr {
|
||||
schema_name: "public".to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
region_number: 0,
|
||||
columns: vec![
|
||||
expected_host_col.clone(),
|
||||
expected_cpu_col.clone(),
|
||||
expected_mem_col.clone(),
|
||||
expected_ts_col.clone(),
|
||||
],
|
||||
row_count: 4,
|
||||
};
|
||||
let result = db.insert(expr).await;
|
||||
result.unwrap();
|
||||
|
||||
// select
|
||||
let result = db
|
||||
.select(client::Select::Sql("select * from demo".to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(matches!(result, ObjectResult::Select(_)));
|
||||
match result {
|
||||
ObjectResult::Select(select_result) => {
|
||||
assert_eq!(4, select_result.row_count);
|
||||
let actual_columns = select_result.columns;
|
||||
assert_eq!(4, actual_columns.len());
|
||||
|
||||
// Respect the order in create table schema
|
||||
let expected_columns = vec![
|
||||
expected_host_col,
|
||||
expected_cpu_col,
|
||||
expected_mem_col,
|
||||
expected_ts_col,
|
||||
];
|
||||
expected_columns
|
||||
.iter()
|
||||
.zip(actual_columns.iter())
|
||||
.for_each(|(x, y)| assert_eq!(x, y));
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn testing_create_expr() -> CreateExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "cpu".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "memory".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: 15, // timestamp
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
},
|
||||
];
|
||||
CreateExpr {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
table_name: "demo".to_string(),
|
||||
desc: Some("blabla".to_string()),
|
||||
column_defs,
|
||||
time_index: "ts".to_string(),
|
||||
primary_keys: vec!["ts".to_string(), "host".to_string()],
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(MIN_USER_TABLE_ID),
|
||||
region_ids: vec![0],
|
||||
}
|
||||
}
|
||||
241
tests-integration/tests/http.rs
Normal file
241
tests-integration/tests/http.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::Router;
|
||||
use axum_test_helper::TestClient;
|
||||
use datanode::instance::{Instance, InstanceRef};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use serde_json::json;
|
||||
use servers::http::{HttpOptions, HttpServer, JsonOutput, JsonResponse};
|
||||
use test_util::TestGuard;
|
||||
use tests_integration::test_util;
|
||||
|
||||
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
|
||||
let fe_opts = FrontendOptions::default();
|
||||
let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap();
|
||||
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
|
||||
frontend_instance.set_script_handler(datanode_instance);
|
||||
frontend_instance
|
||||
}
|
||||
|
||||
async fn make_test_app(name: &str) -> (Router, TestGuard) {
|
||||
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
|
||||
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
|
||||
instance.start().await.unwrap();
|
||||
test_util::create_test_table(
|
||||
instance.catalog_manager(),
|
||||
instance.sql_handler(),
|
||||
ConcreteDataType::timestamp_millis_datatype(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let http_server = HttpServer::new(instance, HttpOptions::default());
|
||||
(http_server.make_app(), guard)
|
||||
}
|
||||
|
||||
async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) {
|
||||
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
|
||||
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
|
||||
let mut frontend = build_frontend_instance(instance.clone()).await;
|
||||
instance.start().await.unwrap();
|
||||
test_util::create_test_table(
|
||||
frontend.catalog_manager().as_ref().unwrap(),
|
||||
instance.sql_handler(),
|
||||
ConcreteDataType::timestamp_millis_datatype(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
frontend.start().await.unwrap();
|
||||
let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default());
|
||||
http_server.set_script_handler(instance.clone());
|
||||
let app = http_server.make_app();
|
||||
(app, guard)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_sql_api() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, _guard) = make_test_app("sql_api").await;
|
||||
let client = TestClient::new(app);
|
||||
let res = client.get("/v1/sql").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":1004,"error":"sql parameter is required."}"#
|
||||
assert_eq!(body.code(), 1004);
|
||||
assert_eq!(body.error().unwrap(), "sql parameter is required.");
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select * from numbers limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}]}"#
|
||||
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
|
||||
let output = body.output().unwrap();
|
||||
assert_eq!(output.len(), 1);
|
||||
assert_eq!(
|
||||
output[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}
|
||||
})).unwrap()
|
||||
);
|
||||
|
||||
// test insert and select
|
||||
let res = client
|
||||
.get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// select *
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select * from demo limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
assert_eq!(output.len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
output[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}
|
||||
})).unwrap()
|
||||
);
|
||||
|
||||
// select with projections
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select cpu, ts from demo limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
assert_eq!(output.len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
output[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}
|
||||
})).unwrap()
|
||||
);
|
||||
|
||||
// select with column alias
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select cpu as c, ts as time from demo limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
assert_eq!(output.len(), 1);
|
||||
assert_eq!(
|
||||
output[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}
|
||||
})).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_metrics_api() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
common_telemetry::init_default_metrics_recorder();
|
||||
let (app, _guard) = make_test_app("metrics_api").await;
|
||||
let client = TestClient::new(app);
|
||||
|
||||
// Send a sql
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select * from numbers limit 10")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let res = client.get("/metrics").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = res.text().await;
|
||||
assert!(body.contains("datanode_handle_sql_elapsed"));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_scripts_api() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, _guard) = make_test_app_with_frontend("script_api").await;
|
||||
let client = TestClient::new(app);
|
||||
|
||||
let res = client
|
||||
.post("/v1/scripts?name=test")
|
||||
.body(
|
||||
r#"
|
||||
@copr(sql='select number from numbers limit 10', args=['number'], returns=['n'])
|
||||
def test(n):
|
||||
return n + 1;
|
||||
"#,
|
||||
)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":0}"#
|
||||
assert_eq!(body.code(), 0);
|
||||
assert!(body.output().is_none());
|
||||
|
||||
// call script
|
||||
let res = client.post("/v1/run-script?name=test").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}]}"#
|
||||
assert_eq!(body.code(), 0);
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
assert_eq!(output.len(), 1);
|
||||
assert_eq!(
|
||||
output[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}
|
||||
})).unwrap()
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user