Merge branch 'dev' into replace-arrow2

This commit is contained in:
Ruihang Xia
2022-12-05 20:10:37 +08:00
233 changed files with 16920 additions and 2055 deletions

View File

@@ -11,13 +11,15 @@ python = ["dep:script"]
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
axum = "0.6"
axum-macros = "0.3"
backon = "0.2"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
@@ -26,36 +28,39 @@ common-time = { path = "../common/time" }
common-insert = { path = "../common/insert" }
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
frontend = { path = "../frontend" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }
meta-client = { path = "../meta-client" }
meta-srv = { path = "../meta-srv", features = ["mock"] }
metrics = "0.20"
mito = { path = "../mito", features = ["test"] }
object-store = { path = "../object-store" }
query = { path = "../query" }
script = { path = "../script", features = ["python"], optional = true }
serde = "1.0"
serde_json = "1.0"
servers = { path = "../servers" }
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
storage = { path = "../storage" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
mito = { path = "../mito", features = ["test"] }
tokio = { version = "1.18", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
frontend = { path = "../frontend" }
[dev-dependencies]
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
tempdir = "0.3"axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
tempdir = "0.3"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }

View File

@@ -26,7 +26,15 @@ use crate::server::Services;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File { data_dir: String },
File {
data_dir: String,
},
S3 {
bucket: String,
root: String,
access_key_id: String,
secret_access_key: String,
},
}
impl Default for ObjectStoreConfig {
@@ -47,6 +55,7 @@ pub struct DatanodeOptions {
pub meta_client_opts: Option<MetaClientOpts>,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub mode: Mode,
}
@@ -61,6 +70,7 @@ impl Default for DatanodeOptions {
meta_client_opts: None,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
mode: Mode::Standalone,
}
}
@@ -86,9 +96,18 @@ impl Datanode {
pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");
self.instance.start().await?;
self.services.start(&self.opts).await?;
Ok(())
self.start_instance().await?;
self.start_services().await
}
/// Start only the internal component of datanode.
pub async fn start_instance(&mut self) -> Result<()> {
self.instance.start().await
}
/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
self.services.start(&self.opts).await
}
pub fn get_instance(&self) -> InstanceRef {

View File

@@ -18,6 +18,8 @@ use common_error::prelude::*;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
use crate::datanode::ObjectStoreConfig;
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -73,6 +75,13 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Failed to drop table {}, source: {}", table_name, source))]
DropTable {
table_name: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },
@@ -82,9 +91,6 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Missing timestamp column in request"))]
MissingTimestampColumn { backtrace: Backtrace },
@@ -138,10 +144,10 @@ pub enum Error {
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
#[snafu(display("Failed to init backend, dir: {}, source: {}", dir, source))]
#[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))]
InitBackend {
dir: String,
source: std::io::Error,
config: ObjectStoreConfig,
source: object_store::Error,
backtrace: Backtrace,
},
@@ -202,21 +208,16 @@ pub enum Error {
source: common_grpc::Error,
},
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(display("Failed to convert alter expr to request: {}", source))]
AlterExprToRequest {
#[snafu(backtrace)]
source: api::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Invalid column proto definition, column: {}, source: {}",
column,
source
))]
InvalidColumnDef {
column: String,
#[snafu(display("Failed to convert create expr to request: {}", source))]
CreateExprToRequest {
#[snafu(backtrace)]
source: api::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
@@ -263,7 +264,7 @@ pub enum Error {
#[snafu(display("Failed to insert data, source: {}", source))]
InsertData {
#[snafu(backtrace)]
source: common_insert::error::Error,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Insert batch is empty"))]
@@ -306,6 +307,7 @@ impl ErrorExt for Error {
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::DropTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
@@ -316,6 +318,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::AlterExprToRequest { source, .. }
| Error::CreateExprToRequest { source, .. } => source.status_code(),
Error::CreateSchema { source, .. }
| Error::ConvertSchema { source, .. }
| Error::VectorComputation { source } => source.status_code(),
@@ -324,7 +328,6 @@ impl ErrorExt for Error {
| Error::InvalidSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingField { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
@@ -343,10 +346,6 @@ impl ErrorExt for Error {
| Error::UnsupportedExpr { .. }
| Error::Catalog { .. } => StatusCode::Internal,
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
}
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, path};
use backon::ExponentialBackoff;
use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -26,8 +27,9 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::LoggingLayer;
use object_store::services::fs::Builder;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::s3::Builder as S3Builder;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
@@ -99,17 +101,29 @@ impl Instance {
// create remote catalog manager
let (catalog_manager, factory, table_id_provider) = match opts.mode {
Mode::Standalone => {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(CatalogSnafu)?,
);
let factory = QueryEngineFactory::new(catalog.clone());
(
catalog.clone() as CatalogManagerRef,
factory,
Some(catalog as TableIdProviderRef),
)
if opts.enable_memory_catalog {
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
let factory = QueryEngineFactory::new(catalog.clone());
(
catalog.clone() as CatalogManagerRef,
factory,
Some(catalog as TableIdProviderRef),
)
} else {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(CatalogSnafu)?,
);
let factory = QueryEngineFactory::new(catalog.clone());
(
catalog.clone() as CatalogManagerRef,
factory,
Some(catalog as TableIdProviderRef),
)
}
}
Mode::Distributed => {
@@ -139,7 +153,11 @@ impl Instance {
};
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
sql_handler: SqlHandler::new(
table_engine,
catalog_manager.clone(),
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
@@ -170,24 +188,64 @@ impl Instance {
}
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let data_dir = util::normalize_dir(match store_config {
ObjectStoreConfig::File { data_dir } => data_dir,
});
let object_store = match store_config {
ObjectStoreConfig::File { data_dir } => new_fs_object_store(data_dir).await,
ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await,
};
object_store.map(|object_store| {
object_store
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
.layer(MetricsLayer)
.layer(LoggingLayer)
.layer(TracingLayer)
})
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let (root, secret_key, key_id, bucket) = match store_config {
ObjectStoreConfig::S3 {
bucket,
root,
access_key_id,
secret_access_key,
} => (root, secret_access_key, access_key_id, bucket),
_ => unreachable!(),
};
let root = util::normalize_dir(root);
info!("The s3 storage bucket is: {}, root is: {}", bucket, &root);
let accessor = S3Builder::default()
.root(&root)
.bucket(bucket)
.access_key_id(key_id)
.secret_access_key(secret_key)
.build()
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
}
pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
let data_dir = util::normalize_dir(data_dir);
fs::create_dir_all(path::Path::new(&data_dir))
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The file storage directory is: {}", &data_dir);
info!("The storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{}/.tmp/", data_dir);
let accessor = Builder::default()
let accessor = FsBuilder::default()
.root(&data_dir)
.atomic_write_dir(&atomic_write_dir)
.build()
.context(error::InitBackendSnafu { dir: &data_dir })?;
.context(error::InitBackendSnafu {
config: ObjectStoreConfig::File { data_dir },
})?;
let object_store = ObjectStore::new(accessor).layer(LoggingLayer); // Add logging
Ok(object_store)
Ok(ObjectStore::new(accessor))
}
/// Create metasrv client instance and spawn heartbeat loop.

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use api::v1::{
admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, CreateDatabaseExpr,
admin_expr, object_expr, select_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
@@ -22,10 +24,11 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::select::to_object_result;
use common_insert::insertion_expr_to_request;
use common_grpc_expr::insertion_expr_to_request;
use common_query::Output;
use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use session::context::QueryContext;
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::CreateDatabaseRequest;
@@ -44,7 +47,7 @@ impl Instance {
catalog_name: &str,
schema_name: &str,
table_name: &str,
values: insert_expr::Values,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> Result<Output> {
let schema_provider = self
.catalog_manager
@@ -55,11 +58,7 @@ impl Instance {
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: schema_name })?;
let insert_batches =
common_insert::insert_batches(&values.values).context(InsertDataSnafu)?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);
let table = schema_provider
.table(table_name)
.context(CatalogSnafu)?
@@ -87,10 +86,10 @@ impl Instance {
catalog_name: &str,
schema_name: &str,
table_name: &str,
values: insert_expr::Values,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> ObjectResult {
match self
.execute_grpc_insert(catalog_name, schema_name, table_name, values)
.execute_grpc_insert(catalog_name, schema_name, table_name, insert_batches)
.await
{
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
@@ -114,7 +113,9 @@ impl Instance {
async fn do_handle_select(&self, select_expr: SelectExpr) -> Result<Output> {
let expr = select_expr.expr;
match expr {
Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await,
Some(select_expr::Expr::Sql(sql)) => {
self.execute_sql(&sql, Arc::new(QueryContext::new())).await
}
Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
@@ -170,25 +171,13 @@ impl GrpcQueryHandler for Instance {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
let table_name = &insert_expr.table_name;
let expr = insert_expr
.expr
.context(servers::error::InvalidQuerySnafu {
reason: "missing `expr` in `InsertExpr`",
})?;
// TODO(fys): _region_number is for later use.
let _region_number: u32 = insert_expr.region_number;
match expr {
insert_expr::Expr::Values(values) => {
self.handle_insert(catalog_name, schema_name, table_name, values)
.await
}
insert_expr::Expr::Sql(sql) => {
let output = self.execute_sql(&sql).await;
to_object_result(output).await
}
}
let insert_batches = vec![(insert_expr.columns, insert_expr.row_count)];
self.handle_insert(catalog_name, schema_name, table_name, insert_batches)
.await
}
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
@@ -211,6 +200,9 @@ impl GrpcAdminHandler for Instance {
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await
}
Some(admin_expr::Expr::DropTable(drop_table_expr)) => {
self.handle_drop_table(drop_table_expr).await
}
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),

View File

@@ -13,25 +13,27 @@
// limitations under the License.
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::{error, info};
use common_telemetry::timer;
use servers::query_handler::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::CreateDatabaseRequest;
use crate::error::{
BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result,
SchemaNotFoundSnafu, TableIdProviderNotFoundSnafu,
};
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::instance::Instance;
use crate::metric;
use crate::sql::SqlRequest;
impl Instance {
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
@@ -41,7 +43,7 @@ impl Instance {
Statement::Query(_) => {
let logical_plan = self
.query_engine
.statement_to_plan(stmt)
.statement_to_plan(stmt, query_ctx)
.context(ExecuteSqlSnafu)?;
self.query_engine
@@ -50,20 +52,15 @@ impl Instance {
.context(ExecuteSqlSnafu)
}
Statement::Insert(i) => {
let (catalog_name, schema_name, _table_name) =
i.full_table_name().context(ParseSqlSnafu)?;
let schema_provider = self
.catalog_manager
.catalog(&catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { name: catalog_name })?
.schema(&schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: schema_name })?;
let request = self.sql_handler.insert_to_request(schema_provider, *i)?;
self.sql_handler.execute(request).await
let (catalog, schema, table) =
table_idents_to_full_name(i.table_name(), query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self.sql_handler.insert_to_request(
self.catalog_manager.clone(),
*i,
table_ref,
)?;
self.sql_handler.execute(request, query_ctx).await
}
Statement::CreateDatabase(c) => {
@@ -74,7 +71,7 @@ impl Instance {
info!("Creating a new database: {}", request.db_name);
self.sql_handler
.execute(SqlRequest::CreateDatabase(request))
.execute(SqlRequest::CreateDatabase(request), query_ctx)
.await
}
@@ -89,49 +86,116 @@ impl Instance {
let _engine_name = c.engine.clone();
// TODO(hl): Select table engine by engine_name
let request = self.sql_handler.create_to_request(table_id, c)?;
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let name = c.name.clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self.sql_handler.create_to_request(table_id, c, table_ref)?;
let table_id = request.id;
info!(
"Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}",
catalog_name, schema_name, table_name, table_id
catalog, schema, table, table_id
);
self.sql_handler
.execute(SqlRequest::CreateTable(request))
.execute(SqlRequest::CreateTable(request), query_ctx)
.await
}
Statement::Alter(alter_table) => {
let req = self.sql_handler.alter_to_request(alter_table)?;
self.sql_handler.execute(SqlRequest::Alter(req)).await
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let req = self.sql_handler.alter_to_request(alter_table, table_ref)?;
self.sql_handler
.execute(SqlRequest::Alter(req), query_ctx)
.await
}
Statement::DropTable(drop_table) => {
let req = self.sql_handler.drop_table_to_request(drop_table);
self.sql_handler
.execute(SqlRequest::DropTable(req), query_ctx)
.await
}
Statement::ShowDatabases(stmt) => {
self.sql_handler
.execute(SqlRequest::ShowDatabases(stmt))
.execute(SqlRequest::ShowDatabases(stmt), query_ctx)
.await
}
Statement::ShowTables(stmt) => {
self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await
self.sql_handler
.execute(SqlRequest::ShowTables(stmt), query_ctx)
.await
}
Statement::Explain(stmt) => {
self.sql_handler
.execute(SqlRequest::Explain(Box::new(stmt)), query_ctx)
.await
}
Statement::DescribeTable(stmt) => {
self.sql_handler
.execute(SqlRequest::DescribeTable(stmt))
.execute(SqlRequest::DescribeTable(stmt), query_ctx)
.await
}
Statement::ShowCreateTable(_stmt) => {
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
}
Statement::Use(db) => {
ensure!(
self.catalog_manager
.schema(DEFAULT_CATALOG_NAME, &db)
.context(error::CatalogSnafu)?
.is_some(),
error::SchemaNotFoundSnafu { name: &db }
);
query_ctx.set_current_schema(&db);
Ok(Output::RecordBatches(RecordBatches::empty()))
}
}
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved.
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>`) to tuple.
fn table_idents_to_full_name(
obj_name: &ObjectName,
query_ctx: QueryContextRef,
) -> Result<(String, String, String)> {
match &obj_name.0[..] {
[table] => Ok((
DEFAULT_CATALOG_NAME.to_string(),
query_ctx.current_schema().unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
table.value.clone(),
)),
[schema, table] => Ok((
DEFAULT_CATALOG_NAME.to_string(),
schema.value.clone(),
table.value.clone(),
)),
[catalog, schema, table] => Ok((
catalog.value.clone(),
schema.value.clone(),
table.value.clone(),
)),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {}",
obj_name
),
}.fail(),
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> servers::error::Result<Output> {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query)
self.execute_sql(query, query_ctx)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
@@ -140,3 +204,78 @@ impl SqlQueryHandler for Instance {
.context(servers::error::ExecuteQuerySnafu { query })
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use session::context::QueryContext;
use super::*;
#[test]
fn test_table_idents_to_full_name() {
let my_catalog = "my_catalog";
let my_schema = "my_schema";
let my_table = "my_table";
let full = ObjectName(vec![my_catalog.into(), my_schema.into(), my_table.into()]);
let partial = ObjectName(vec![my_schema.into(), my_table.into()]);
let bare = ObjectName(vec![my_table.into()]);
let using_schema = "foo";
let query_ctx = Arc::new(QueryContext::with_current_schema(using_schema.to_string()));
let empty_ctx = Arc::new(QueryContext::new());
assert_eq!(
table_idents_to_full_name(&full, query_ctx.clone()).unwrap(),
(
my_catalog.to_string(),
my_schema.to_string(),
my_table.to_string()
)
);
assert_eq!(
table_idents_to_full_name(&full, empty_ctx.clone()).unwrap(),
(
my_catalog.to_string(),
my_schema.to_string(),
my_table.to_string()
)
);
assert_eq!(
table_idents_to_full_name(&partial, query_ctx.clone()).unwrap(),
(
DEFAULT_CATALOG_NAME.to_string(),
my_schema.to_string(),
my_table.to_string()
)
);
assert_eq!(
table_idents_to_full_name(&partial, empty_ctx.clone()).unwrap(),
(
DEFAULT_CATALOG_NAME.to_string(),
my_schema.to_string(),
my_table.to_string()
)
);
assert_eq!(
table_idents_to_full_name(&bare, query_ctx).unwrap(),
(
DEFAULT_CATALOG_NAME.to_string(),
using_schema.to_string(),
my_table.to_string()
)
);
assert_eq!(
table_idents_to_full_name(&bare, empty_ctx).unwrap(),
(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
my_table.to_string()
)
);
}
}

View File

@@ -22,6 +22,6 @@ mod metric;
mod mock;
mod script;
pub mod server;
mod sql;
pub mod sql;
#[cfg(test)]
mod tests;

View File

@@ -58,7 +58,11 @@ impl Instance {
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone());
let sql_handler = SqlHandler::new(
mock_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
);
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
@@ -123,7 +127,11 @@ impl Instance {
);
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
sql_handler: SqlHandler::new(
table_engine,
catalog_manager.clone(),
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,

View File

@@ -62,6 +62,7 @@ impl Services {
Some(MysqlServer::create_server(
instance.clone(),
mysql_io_runtime,
Default::default(),
))
}
};

View File

@@ -15,19 +15,17 @@
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::{error, info};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use session::context::QueryContext;
use snafu::prelude::*;
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use table::requests::DropTableRequest;
use crate::error::{self, BumpTableIdSnafu, MissingFieldSnafu, Result};
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
use crate::instance::Instance;
use crate::sql::SqlRequest;
@@ -75,9 +73,14 @@ impl Instance {
}
};
let request = create_expr_to_request(table_id, expr).await;
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu);
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request)))
.and_then(|request| {
self.sql_handler().execute(
SqlRequest::CreateTable(request),
Arc::new(QueryContext::new()),
)
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
@@ -94,18 +97,24 @@ impl Instance {
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match alter_expr_to_request(expr).transpose() {
Some(req) => req,
let request = match alter_expr_to_request(expr)
.context(AlterExprToRequestSnafu)
.transpose()
{
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
Some(req) => req,
};
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::Alter(request)))
.and_then(|request| {
self.sql_handler()
.execute(SqlRequest::Alter(request), Arc::new(QueryContext::new()))
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
@@ -119,156 +128,50 @@ impl Instance {
.build(),
}
}
}
async fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(error::KeyColumnNotFoundSnafu { name: key })
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_ids = if expr.region_ids.is_empty() {
vec![0]
} else {
expr.region_ids
};
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: region_ids,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
})
}
fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let mut add_column_requests = vec![];
for add_column_expr in add_columns.add_columns {
let column_def = add_column_expr.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let schema =
column_def
.try_as_column_schema()
.context(error::InvalidColumnDefSnafu {
column: &column_def.name,
})?;
add_column_requests.push(AddColumnRequest {
column_schema: schema,
is_key: add_column_expr.is_key,
})
}
let alter_kind = AlterKind::AddColumns {
columns: add_column_requests,
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult {
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
};
let result = self
.sql_handler()
.execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new()))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as _, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
Some(Kind::DropColumns(DropColumns { drop_columns })) => {
let alter_kind = AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
None => Ok(None),
}
}
fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
.map(|x| {
x.try_as_column_schema()
.context(error::InvalidColumnDefSnafu { column: &x.name })
})
.collect::<Result<Vec<ColumnSchema>>>()?;
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
error::KeyColumnNotFoundSnafu {
name: &expr.time_index,
}
);
let column_schemas = column_schemas
.into_iter()
.map(|column_schema| {
if column_schema.name == expr.time_index {
column_schema.with_time_index(true)
} else {
column_schema
}
})
.collect::<Vec<_>>();
Ok(Arc::new(
SchemaBuilder::try_from(column_schemas)
.context(error::CreateSchemaSnafu)?
.build()
.context(error::CreateSchemaSnafu)?,
))
}
#[cfg(test)]
mod tests {
use api::v1::ColumnDef;
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use super::*;
use crate::tests::test_util;
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_expr_to_request");
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr).await.unwrap();
assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID);
let request = create_expr_to_request(1024, expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
@@ -279,12 +182,13 @@ mod tests {
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
let result = create_expr_to_request(1025, expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
"{}",
err_msg
);
}
#[test]
@@ -295,14 +199,16 @@ mod tests {
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {}",
err_msg
);
}
#[test]
fn test_create_column_schema() {
let column_def = ColumnDef {
name: "a".to_string(),
@@ -318,7 +224,7 @@ mod tests {
let column_def = ColumnDef {
name: "a".to_string(),
datatype: 12, // string
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
};
@@ -330,7 +236,7 @@ mod tests {
let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value"));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: 12, // string
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
@@ -348,25 +254,25 @@ mod tests {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: 12, // string
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: 9, // float32
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: 10, // float64
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},

View File

@@ -12,22 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! sql handler
use catalog::CatalogManagerRef;
use common_query::Output;
use query::sql::{describe_table, show_databases, show_tables};
use common_telemetry::error;
use query::query_engine::QueryEngineRef;
use query::sql::{describe_table, explain, show_databases, show_tables};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::{EngineContext, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;
use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu};
use crate::error::{ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
mod alter;
mod create;
mod drop_table;
mod insert;
#[derive(Debug)]
@@ -36,41 +39,61 @@ pub enum SqlRequest {
CreateTable(CreateTableRequest),
CreateDatabase(CreateDatabaseRequest),
Alter(AlterTableRequest),
DropTable(DropTableRequest),
ShowDatabases(ShowDatabases),
ShowTables(ShowTables),
DescribeTable(DescribeTable),
Explain(Box<Explain>),
}
// Handler to execute SQL except query
pub struct SqlHandler {
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
}
impl SqlHandler {
pub fn new(table_engine: TableEngineRef, catalog_manager: CatalogManagerRef) -> Self {
pub fn new(
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Self {
Self {
table_engine,
catalog_manager,
query_engine,
}
}
pub async fn execute(&self, request: SqlRequest) -> Result<Output> {
match request {
// TODO(LFC): Refactor consideration: a context awareness "Planner".
// Now we have some query related state (like current using database in session context), maybe
// we could create a new struct called `Planner` that stores context and handle these queries
// there, instead of executing here in a "static" fashion.
pub async fn execute(&self, request: SqlRequest, query_ctx: QueryContextRef) -> Result<Output> {
let result = match request {
SqlRequest::Insert(req) => self.insert(req).await,
SqlRequest::CreateTable(req) => self.create_table(req).await,
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::ShowDatabases(stmt) => {
show_databases(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
SqlRequest::ShowTables(stmt) => {
show_tables(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
show_tables(stmt, self.catalog_manager.clone(), query_ctx).context(ExecuteSqlSnafu)
}
SqlRequest::DescribeTable(stmt) => {
describe_table(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
describe_table(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
SqlRequest::Explain(stmt) => explain(stmt, self.query_engine.clone(), query_ctx)
.await
.context(ExecuteSqlSnafu),
};
if let Err(e) = &result {
error!("Datanode execution error: {:?}", e);
}
result
}
pub(crate) fn get_table<'a>(&self, table_ref: &'a TableReference) -> Result<TableRef> {
@@ -94,7 +117,8 @@ mod tests {
use std::any::Any;
use std::sync::Arc;
use catalog::SchemaProvider;
use catalog::{CatalogList, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_time::timestamp::Timestamp;
@@ -214,9 +238,17 @@ mod tests {
.await
.unwrap(),
);
let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap();
catalog_provider
.register_schema(
DEFAULT_SCHEMA_NAME.to_string(),
Arc::new(MockSchemaProvider {}),
)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(table_engine, catalog_list);
let sql_handler = SqlHandler::new(table_engine, catalog_list.clone(), query_engine.clone());
let stmt = match query_engine.sql_to_statement(sql).unwrap() {
Statement::Insert(i) => i,
@@ -224,9 +256,8 @@ mod tests {
unreachable!()
}
};
let schema_provider = Arc::new(MockSchemaProvider {});
let request = sql_handler
.insert_to_request(schema_provider, *stmt)
.insert_to_request(catalog_list.clone(), *stmt, TableReference::bare("demo"))
.unwrap();
match request {

View File

@@ -16,7 +16,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::statements::column_def_to_schema;
use table::engine::{EngineContext, TableReference};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
@@ -53,10 +53,11 @@ impl SqlHandler {
Ok(Output::AffectedRows(0))
}
pub(crate) fn alter_to_request(&self, alter_table: AlterTable) -> Result<AlterTableRequest> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name()).context(error::ParseSqlSnafu)?;
pub(crate) fn alter_to_request(
&self,
alter_table: AlterTable,
table_ref: TableReference,
) -> Result<AlterTableRequest> {
let alter_kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(table_constraint) => {
return error::InvalidSqlSnafu {
@@ -77,9 +78,9 @@ impl SqlHandler {
},
};
Ok(AlterTableRequest {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
catalog_name: Some(table_ref.catalog.to_string()),
schema_name: Some(table_ref.schema.to_string()),
table_name: table_ref.table.to_string(),
alter_kind,
})
}
@@ -112,7 +113,9 @@ mod tests {
async fn test_alter_to_request_with_adding_column() {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;");
let req = handler.alter_to_request(alter_table).unwrap();
let req = handler
.alter_to_request(alter_table, TableReference::bare("my_metric_1"))
.unwrap();
assert_eq!(req.catalog_name, Some("greptime".to_string()));
assert_eq!(req.schema_name, Some("public".to_string()));
assert_eq!(req.table_name, "my_metric_1");

View File

@@ -23,10 +23,10 @@ use common_telemetry::tracing::log::error;
use datatypes::schema::SchemaBuilder;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::TableConstraint;
use sql::statements::column_def_to_schema;
use sql::statements::create::CreateTable;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use store_api::storage::consts::TIME_INDEX_NAME;
use table::engine::EngineContext;
use table::engine::{EngineContext, TableReference};
use table::metadata::TableId;
use table::requests::*;
@@ -84,7 +84,6 @@ impl SqlHandler {
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table_id = req.id;
let table = self
.table_engine
.create_table(&ctx, req)
@@ -97,7 +96,7 @@ impl SqlHandler {
catalog: table.table_info().catalog_name.clone(),
schema: table.table_info().schema_name.clone(),
table_name: table_name.clone(),
table_id,
table_id: table.table_info().ident.table_id,
table,
};
@@ -115,13 +114,11 @@ impl SqlHandler {
&self,
table_id: TableId,
stmt: CreateTable,
table_ref: TableReference,
) -> Result<CreateTableRequest> {
let mut ts_index = usize::MAX;
let mut primary_keys = vec![];
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&stmt.name).context(error::ParseSqlSnafu)?;
let col_map = stmt
.columns
.iter()
@@ -172,7 +169,7 @@ impl SqlHandler {
return ConstraintNotSupportedSnafu {
constraint: format!("{:?}", c),
}
.fail()
.fail();
}
}
}
@@ -186,14 +183,6 @@ impl SqlHandler {
ensure!(ts_index != usize::MAX, error::MissingTimestampColumnSnafu);
if primary_keys.is_empty() {
info!(
"Creating table: {:?}.{:?}.{} but primary key not set, use time index column: {}",
catalog_name, schema_name, table_name, ts_index
);
primary_keys.push(ts_index);
}
let columns_schemas: Vec<_> = stmt
.columns
.iter()
@@ -212,9 +201,9 @@ impl SqlHandler {
let request = CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name,
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
desc: None,
schema,
region_numbers: vec![0],
@@ -262,7 +251,9 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let c = handler.create_to_request(42, parsed_stmt).unwrap();
let c = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
assert!(!c.create_if_not_exists);
@@ -283,11 +274,12 @@ mod tests {
memory double,
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let error = handler.create_to_request(42, parsed_stmt).unwrap_err();
let error = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::MissingTimestampColumn { .. });
}
/// If primary key is not specified, time index should be used as primary key.
#[tokio::test]
pub async fn test_primary_key_not_specified() {
let handler = create_mock_sql_handler().await;
@@ -300,12 +292,11 @@ mod tests {
memory double,
TIME INDEX (ts)) engine=mito with(regions=1);"#,
);
let c = handler.create_to_request(42, parsed_stmt).unwrap();
assert_eq!(1, c.primary_key_indices.len());
assert_eq!(
c.schema.timestamp_index().unwrap(),
c.primary_key_indices[0]
);
let c = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.unwrap();
assert!(c.primary_key_indices.is_empty());
assert_eq!(c.schema.timestamp_index(), Some(1));
}
/// Constraints specified, not column cannot be found.
@@ -319,7 +310,9 @@ mod tests {
TIME INDEX (ts)) engine=mito with(regions=1);"#,
);
let error = handler.create_to_request(42, parsed_stmt).unwrap_err();
let error = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::KeyColumnNotFound { .. });
}
@@ -339,7 +332,9 @@ mod tests {
let handler = create_mock_sql_handler().await;
let error = handler.create_to_request(42, create_table).unwrap_err();
let error = handler
.create_to_request(42, create_table, TableReference::full("c", "s", "demo"))
.unwrap_err();
assert_matches!(error, Error::InvalidPrimaryKey { .. });
}
@@ -359,7 +354,9 @@ mod tests {
let handler = create_mock_sql_handler().await;
let request = handler.create_to_request(42, create_table).unwrap();
let request = handler
.create_to_request(42, create_table, TableReference::full("c", "s", "demo"))
.unwrap();
assert_eq!(42, request.id);
assert_eq!("c".to_string(), request.catalog_name);

View File

@@ -0,0 +1,71 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::DeregisterTableRequest;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::info;
use snafu::ResultExt;
use sql::statements::drop::DropTable;
use table::engine::{EngineContext, TableReference};
use table::requests::DropTableRequest;
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub async fn drop_table(&self, req: DropTableRequest) -> Result<Output> {
let deregister_table_req = DeregisterTableRequest {
catalog: req.catalog_name.clone(),
schema: req.schema_name.clone(),
table_name: req.table_name.clone(),
};
let table_reference = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
let table_full_name = table_reference.to_string();
self.catalog_manager
.deregister_table(deregister_table_req)
.await
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: table_full_name.clone(),
})?;
let ctx = EngineContext {};
self.table_engine()
.drop_table(&ctx, req)
.await
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: table_full_name.clone(),
})?;
info!("Successfully dropped table: {}", table_full_name);
Ok(Output::AffectedRows(1))
}
pub fn drop_table_to_request(&self, drop_table: DropTable) -> DropTableRequest {
DropTableRequest {
catalog_name: drop_table.catalog_name,
schema_name: drop_table.schema_name,
table_name: drop_table.table_name,
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::SchemaProviderRef;
use catalog::CatalogManagerRef;
use common_query::Output;
use datatypes::prelude::{ConcreteDataType, VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
@@ -23,7 +23,7 @@ use table::engine::TableReference;
use table::requests::*;
use crate::error::{
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu,
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
@@ -49,19 +49,18 @@ impl SqlHandler {
pub(crate) fn insert_to_request(
&self,
schema_provider: SchemaProviderRef,
catalog_manager: CatalogManagerRef,
stmt: Insert,
table_ref: TableReference,
) -> Result<SqlRequest> {
let columns = stmt.columns();
let values = stmt.values().context(ParseSqlValueSnafu)?;
let (catalog_name, schema_name, table_name) =
stmt.full_table_name().context(ParseSqlSnafu)?;
let table = schema_provider
.table(&table_name)
let table = catalog_manager
.table(table_ref.catalog, table_ref.schema, table_ref.table)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: &table_name,
table_name: table_ref.table,
})?;
let schema = table.schema();
let columns_num = if columns.is_empty() {
@@ -88,7 +87,7 @@ impl SqlHandler {
let column_schema =
schema.column_schema_by_name(column_name).with_context(|| {
ColumnNotFoundSnafu {
table_name: &table_name,
table_name: table_ref.table,
column_name: column_name.to_string(),
}
})?;
@@ -119,9 +118,9 @@ impl SqlHandler {
}
Ok(SqlRequest::Insert(InsertRequest {
catalog_name,
schema_name,
table_name,
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.finish()))

View File

@@ -12,7 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod grpc_test;
mod http_test;
mod instance_test;
pub(crate) mod test_util;

View File

@@ -1,321 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{
admin_result, column, insert_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType,
ColumnDef, CreateExpr, InsertExpr, MutateResult,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_runtime::Builder as RuntimeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use servers::grpc::GrpcServer;
use servers::server::Server;
use servers::Mode;
use crate::instance::Instance;
use crate::tests::test_util::{self, TestGuard};
async fn setup_grpc_server(
name: &str,
datanode_port: usize,
frontend_port: usize,
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port);
opts.rpc_addr = datanode_grpc_addr.clone();
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let datanode_grpc_addr = datanode_grpc_addr.clone();
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
runtime_size: 8,
}),
..Default::default()
};
let datanode_grpc_server = Arc::new(GrpcServer::new(
instance.clone(),
instance.clone(),
runtime.clone(),
));
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
.await
.unwrap();
fe_instance.set_catalog_manager(instance.catalog_manager.clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),
fe_instance_ref,
runtime,
));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();
tokio::spawn(async move {
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
});
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
let dn_grpc_server_clone = datanode_grpc_server.clone();
tokio::spawn(async move {
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
dn_grpc_server_clone.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_auto_create_table() {
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("auto_create_table", 3992, 3993).await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
}
fn expect_data() -> (Column, Column, Column, Column) {
// testing data:
let expected_host_col = Column {
column_name: "host".to_string(),
values: Some(column::Values {
string_values: vec!["host1", "host2", "host3", "host4"]
.into_iter()
.map(|s| s.to_string())
.collect(),
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
};
let expected_cpu_col = Column {
column_name: "cpu".to_string(),
values: Some(column::Values {
f64_values: vec![0.31, 0.41, 0.2],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_mem_col = Column {
column_name: "memory".to_string(),
values: Some(column::Values {
f64_values: vec![0.1, 0.2, 0.3],
..Default::default()
}),
null_mask: vec![4],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
};
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
ts_millis_values: vec![100, 101, 102, 103],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};
(
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_ts_col,
)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_and_select() {
common_telemetry::init_default_ut_logging();
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("insert_and_select", 3990, 3991).await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);
// create
let expr = testing_create_expr();
let result = admin.create(expr).await.unwrap();
assert_matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
);
//alter
let add_column = ColumnDef {
name: "test_column".to_string(),
datatype: ColumnDataType::Int64.into(),
is_nullable: true,
default_constraint: None,
};
let kind = Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(add_column),
is_key: false,
}],
});
let expr = AlterExpr {
table_name: "test_table".to_string(),
catalog_name: None,
schema_name: None,
kind: Some(kind),
};
let result = admin.alter(expr).await.unwrap();
assert_eq!(result.result, None);
// insert
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
}
async fn insert_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
let values = vec![InsertBatch {
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
],
row_count: 4,
}
.into()];
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
options: HashMap::default(),
region_number: 0,
};
let result = db.insert(expr).await;
result.unwrap();
// select
let result = db
.select(client::Select::Sql("select * from demo".to_string()))
.await
.unwrap();
assert!(matches!(result, ObjectResult::Select(_)));
match result {
ObjectResult::Select(select_result) => {
assert_eq!(4, select_result.row_count);
let actual_columns = select_result.columns;
assert_eq!(4, actual_columns.len());
// Respect the order in create table schema
let expected_columns = vec![
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_ts_col,
];
expected_columns
.iter()
.zip(actual_columns.iter())
.for_each(|(x, y)| assert_eq!(x, y));
}
_ => unreachable!(),
}
}
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: 15, // timestamp
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "demo".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}

View File

@@ -1,286 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use datatypes::prelude::ConcreteDataType;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use serde_json::json;
use servers::http::{ColumnSchema, HttpServer, JsonOutput, JsonResponse, Schema};
use test_util::TestGuard;
use crate::instance::{Instance, InstanceRef};
use crate::tests::test_util;
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let fe_opts = FrontendOptions::default();
let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap();
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
}
async fn make_test_app(name: &str) -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
let http_server = HttpServer::new(instance);
(http_server.make_app(), guard)
}
async fn make_test_app_with_frontend(name: &str) -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let mut frontend = build_frontend_instance(instance.clone()).await;
instance.start().await.unwrap();
test_util::create_test_table(
frontend.catalog_manager().as_ref().unwrap(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
frontend.start().await.unwrap();
let mut http_server = HttpServer::new(Arc::new(frontend));
http_server.set_script_handler(instance.clone());
let app = http_server.make_app();
(app, guard)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_api() {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app("sql_api").await;
let client = TestClient::new(app);
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":1004,"error":"sql parameter is required."}"#
assert_eq!(body.code(), 1004);
assert_eq!(body.error().unwrap(), "sql parameter is required.");
assert!(body.execution_time_ms().is_some());
let res = client
.get("/v1/sql?sql=select * from numbers limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 1);
assert_eq!(records.num_rows(), 10);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![ColumnSchema::new(
"number".to_owned(),
"UInt32".to_owned()
)])
);
assert_eq!(records.rows()[0][0], json!(0));
assert_eq!(records.rows()[9][0], json!(9));
} else {
unreachable!()
}
// test insert and select
let res = client
.get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// select *
let res = client
.get("/v1/sql?sql=select * from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 4);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("host".to_owned(), "String".to_owned()),
ColumnSchema::new("cpu".to_owned(), "Float64".to_owned()),
ColumnSchema::new("memory".to_owned(), "Float64".to_owned()),
ColumnSchema::new("ts".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(
records.rows()[0],
vec![json!("host"), json!(66.6), json!(1024.0), json!(0)]
);
} else {
unreachable!();
}
// select with projections
let res = client
.get("/v1/sql?sql=select cpu, ts from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 2);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("cpu".to_owned(), "Float64".to_owned()),
ColumnSchema::new("ts".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(records.rows()[0], vec![json!(66.6), json!(0)]);
} else {
unreachable!()
}
// select with column alias
let res = client
.get("/v1/sql?sql=select cpu as c, ts as time from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
assert!(body.success());
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(records) = &output[0] {
assert_eq!(records.num_cols(), 2);
assert_eq!(records.num_rows(), 1);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![
ColumnSchema::new("c".to_owned(), "Float64".to_owned()),
ColumnSchema::new("time".to_owned(), "Timestamp".to_owned())
])
);
assert_eq!(records.rows()[0], vec![json!(66.6), json!(0)]);
} else {
unreachable!()
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metrics_api() {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
let (app, _guard) = make_test_app("metrics_api").await;
let client = TestClient::new(app);
// Send a sql
let res = client
.get("/v1/sql?sql=select * from numbers limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// Call metrics api
let res = client.get("/metrics").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert!(body.contains("datanode_handle_sql_elapsed"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_scripts_api() {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app_with_frontend("script_api").await;
let client = TestClient::new(app);
let res = client
.post("/v1/scripts?name=test")
.body(
r#"
@copr(sql='select number from numbers limit 10', args=['number'], returns=['n'])
def test(n):
return n + 1;
"#,
)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json: r#"{"code":0}"#
assert_eq!(body.code(), 0);
assert!(body.output().is_none());
// call script
let res = client.post("/v1/run-script?name=test").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
// body json:
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}]}"#
assert_eq!(body.code(), 0);
assert!(body.execution_time_ms().is_some());
let output = body.output().unwrap();
assert_eq!(output.len(), 1);
if let JsonOutput::Records(ref records) = output[0] {
assert_eq!(records.num_cols(), 1);
assert_eq!(records.num_rows(), 10);
assert_eq!(
records.schema().unwrap(),
&Schema::new(vec![ColumnSchema::new(
"n".to_owned(),
"Float64".to_owned()
)])
);
assert_eq!(records.rows()[0][0], json!(1.0));
} else {
unreachable!()
}
}

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::Output;
use common_recordbatch::util;
use datafusion::arrow_print;
@@ -19,6 +22,7 @@ use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::{Int64Array, UInt64Array, Utf8Array};
use datatypes::arrow_array::StringArray;
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContext;
use crate::instance::Instance;
use crate::tests::test_util;
@@ -32,39 +36,33 @@ async fn test_create_database_and_insert_query() {
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance.execute_sql("create database test").await.unwrap();
let output = execute_sql(&instance, "create database test").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql(
r#"create table greptime.test.demo(
let output = execute_sql(
&instance,
r#"create table greptime.test.demo(
host STRING,
cpu DOUBLE,
memory DOUBLE,
ts bigint,
TIME INDEX(ts)
)"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql(
r#"insert into test.demo(host, cpu, memory, ts) values
let output = execute_sql(
&instance,
r#"insert into test.demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = instance
.execute_sql("select ts from test.demo order by ts")
.await
.unwrap();
let query_output = execute_sql(&instance, "select ts from test.demo order by ts").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
@@ -88,54 +86,50 @@ async fn test_issue477_same_table_name_in_different_databases() {
instance.start().await.unwrap();
// Create database a and b
let output = instance.execute_sql("create database a").await.unwrap();
let output = execute_sql(&instance, "create database a").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance.execute_sql("create database b").await.unwrap();
let output = execute_sql(&instance, "create database b").await;
assert!(matches!(output, Output::AffectedRows(1)));
// Create table a.demo and b.demo
let output = instance
.execute_sql(
r#"create table a.demo(
let output = execute_sql(
&instance,
r#"create table a.demo(
host STRING,
ts bigint,
TIME INDEX(ts)
)"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql(
r#"create table b.demo(
let output = execute_sql(
&instance,
r#"create table b.demo(
host STRING,
ts bigint,
TIME INDEX(ts)
)"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
// Insert different data into a.demo and b.demo
let output = instance
.execute_sql(
r#"insert into a.demo(host, ts) values
let output = execute_sql(
&instance,
r#"insert into a.demo(host, ts) values
('host1', 1655276557000)
"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql(
r#"insert into b.demo(host, ts) values
let output = execute_sql(
&instance,
r#"insert into b.demo(host, ts) values
('host2',1655276558000)
"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
// Query data and assert
@@ -157,7 +151,7 @@ async fn test_issue477_same_table_name_in_different_databases() {
}
async fn assert_query_result(instance: &Instance, sql: &str, ts: i64, host: &str) {
let query_output = instance.execute_sql(sql).await.unwrap();
let query_output = execute_sql(instance, sql).await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
@@ -200,15 +194,14 @@ async fn setup_test_instance() -> Instance {
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance().await;
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
}
@@ -228,22 +221,17 @@ async fn test_execute_insert_query_with_i64_timestamp() {
.await
.unwrap();
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = instance
.execute_sql("select ts from demo order by ts")
.await
.unwrap();
let query_output = execute_sql(&instance, "select ts from demo order by ts").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
@@ -257,11 +245,7 @@ async fn test_execute_insert_query_with_i64_timestamp() {
_ => unreachable!(),
}
let query_output = instance
.execute_sql("select ts as time from demo order by ts")
.await
.unwrap();
let query_output = execute_sql(&instance, "select ts as time from demo order by ts").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
@@ -282,10 +266,7 @@ async fn test_execute_query() {
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql("select sum(number) from numbers limit 20")
.await
.unwrap();
let output = execute_sql(&instance, "select sum(number) from numbers limit 20").await;
match output {
Output::Stream(recordbatch) => {
let numbers = util::collect(recordbatch).await.unwrap();
@@ -309,7 +290,7 @@ async fn test_execute_show_databases_tables() {
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance.execute_sql("show databases").await.unwrap();
let output = execute_sql(&instance, "show databases").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
@@ -325,10 +306,7 @@ async fn test_execute_show_databases_tables() {
_ => unreachable!(),
}
let output = instance
.execute_sql("show databases like '%bl%'")
.await
.unwrap();
let output = execute_sql(&instance, "show databases like '%bl%'").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
@@ -344,7 +322,7 @@ async fn test_execute_show_databases_tables() {
_ => unreachable!(),
}
let output = instance.execute_sql("show tables").await.unwrap();
let output = execute_sql(&instance, "show tables").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
@@ -364,7 +342,7 @@ async fn test_execute_show_databases_tables() {
.await
.unwrap();
let output = instance.execute_sql("show tables").await.unwrap();
let output = execute_sql(&instance, "show tables").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
@@ -376,10 +354,7 @@ async fn test_execute_show_databases_tables() {
}
// show tables like [string]
let output = instance
.execute_sql("show tables like 'de%'")
.await
.unwrap();
let output = execute_sql(&instance, "show tables like 'de%'").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
@@ -404,9 +379,9 @@ pub async fn test_execute_create() {
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
let output = execute_sql(
&instance,
r#"create table test_table(
host string,
ts timestamp,
cpu double default 0,
@@ -414,56 +389,24 @@ pub async fn test_execute_create() {
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
)
.await
.unwrap();
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
}
#[tokio::test(flavor = "multi_thread")]
pub async fn test_create_table_illegal_timestamp_type() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) =
test_util::create_tmp_dir_and_datanode_opts("create_table_illegal_timestamp_type");
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
host string,
ts bigint,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
)
.await
.unwrap();
match output {
Output::AffectedRows(rows) => {
assert_eq!(1, rows);
}
_ => unreachable!(),
}
}
async fn check_output_stream(output: Output, expected: Vec<&str>) {
match output {
Output::Stream(stream) => {
let recordbatches = util::collect(stream).await.unwrap();
let recordbatch = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
assert_eq!(pretty_print, expected);
}
let recordbatches = match output {
Output::Stream(stream) => util::collect(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches.take(),
_ => unreachable!(),
}
};
let recordbatches = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
assert_eq!(pretty_print, expected);
}
#[tokio::test]
@@ -479,35 +422,30 @@ async fn test_alter_table() {
.await
.unwrap();
// make sure table insertion is ok before altering table
instance
.execute_sql("insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)")
.await
.unwrap();
execute_sql(
&instance,
"insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)",
)
.await;
// Add column
let output = instance
.execute_sql("alter table demo add my_tag string null")
.await
.unwrap();
let output = execute_sql(&instance, "alter table demo add my_tag string null").await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = instance
.execute_sql(
"insert into demo(host, cpu, memory, ts, my_tag) values ('host2', 2.2, 200, 2000, 'hello')",
)
.await
.unwrap();
let output = execute_sql(
&instance,
"insert into demo(host, cpu, memory, ts, my_tag) values ('host2', 2.2, 200, 2000, 'hello')",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("insert into demo(host, cpu, memory, ts) values ('host3', 3.3, 300, 3000)")
.await
.unwrap();
let output = execute_sql(
&instance,
"insert into demo(host, cpu, memory, ts) values ('host3', 3.3, 300, 3000)",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("select * from demo order by ts")
.await
.unwrap();
let output = execute_sql(&instance, "select * from demo order by ts").await;
let expected = vec![
"+-------+-----+--------+---------------------+--------+",
"| host | cpu | memory | ts | my_tag |",
@@ -520,16 +458,10 @@ async fn test_alter_table() {
check_output_stream(output, expected).await;
// Drop a column
let output = instance
.execute_sql("alter table demo drop column memory")
.await
.unwrap();
let output = execute_sql(&instance, "alter table demo drop column memory").await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = instance
.execute_sql("select * from demo order by ts")
.await
.unwrap();
let output = execute_sql(&instance, "select * from demo order by ts").await;
let expected = vec![
"+-------+-----+---------------------+--------+",
"| host | cpu | ts | my_tag |",
@@ -542,16 +474,14 @@ async fn test_alter_table() {
check_output_stream(output, expected).await;
// insert a new row
let output = instance
.execute_sql("insert into demo(host, cpu, ts, my_tag) values ('host4', 400, 4000, 'world')")
.await
.unwrap();
let output = execute_sql(
&instance,
"insert into demo(host, cpu, ts, my_tag) values ('host4', 400, 4000, 'world')",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("select * from demo order by ts")
.await
.unwrap();
let output = execute_sql(&instance, "select * from demo order by ts").await;
let expected = vec![
"+-------+-----+---------------------+--------+",
"| host | cpu | ts | my_tag |",
@@ -580,27 +510,26 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
) engine=mito with(regions=1);"#,
type_name
);
let output = instance.execute_sql(&create_sql).await.unwrap();
let output = execute_sql(&instance, &create_sql).await;
assert!(matches!(output, Output::AffectedRows(1)));
// Insert with ts.
instance
.execute_sql("insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)")
.await
.unwrap();
let output = execute_sql(
&instance,
"insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
// Insert without ts, so it should be filled by default value.
let output = instance
.execute_sql("insert into test_table(host, cpu) values ('host2', 2.2)")
.await
.unwrap();
let output = execute_sql(
&instance,
"insert into test_table(host, cpu) values ('host2', 2.2)",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("select host, cpu from test_table")
.await
.unwrap();
let output = execute_sql(&instance, "select host, cpu from test_table").await;
let expected = vec![
"+-------+-----+",
"| host | cpu |",
@@ -619,3 +548,70 @@ async fn test_insert_with_default_value() {
test_insert_with_default_value_for_type("timestamp").await;
test_insert_with_default_value_for_type("bigint").await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_use_database() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("use_database");
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = execute_sql(&instance, "create database db1").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(
&instance,
"create table tb1(col_i32 int, ts bigint, TIME INDEX(ts))",
"db1",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(&instance, "show tables", "db1").await;
let expected = vec![
"+--------+",
"| Tables |",
"+--------+",
"| tb1 |",
"+--------+",
];
check_output_stream(output, expected).await;
let output = execute_sql_in_db(
&instance,
r#"insert into tb1(col_i32, ts) values (1, 1655276557000)"#,
"db1",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(&instance, "select col_i32 from tb1", "db1").await;
let expected = vec![
"+---------+",
"| col_i32 |",
"+---------+",
"| 1 |",
"+---------+",
];
check_output_stream(output, expected).await;
// Making a particular database the default by means of the USE statement does not preclude
// accessing tables in other databases.
let output = execute_sql(&instance, "select number from public.numbers limit 1").await;
let expected = vec![
"+--------+",
"| number |",
"+--------+",
"| 0 |",
"+--------+",
];
check_output_stream(output, expected).await;
}
async fn execute_sql(instance: &Instance, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
async fn execute_sql_in_db(instance: &Instance, sql: &str, db: &str) -> Output {
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
instance.execute_sql(sql, query_ctx).await.unwrap()
}

View File

@@ -21,6 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use query::QueryEngineFactory;
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
@@ -88,7 +89,7 @@ pub async fn create_test_table(
.expect("ts is expected to be timestamp column"),
),
create_if_not_exists: true,
primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys
primary_key_indices: vec![0], // "host" is in primary keys
table_options: HashMap::new(),
region_numbers: vec![0],
},
@@ -121,5 +122,9 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
.await
.unwrap(),
);
SqlHandler::new(mock_engine, catalog_manager)
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
}