diff --git a/Cargo.lock b/Cargo.lock index 8857459182..658cd4529c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5593,6 +5593,7 @@ dependencies = [ "async-stream", "async-trait", "chrono", + "common-catalog", "common-error", "common-query", "common-recordbatch", diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 163b872a63..117571b333 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -56,7 +56,7 @@ message InsertExpr { // The "sql" field is meant to be removed in the future. string sql = 3; } - + map options = 4; } diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 4efe3553b9..072a59f41d 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -83,6 +83,12 @@ pub enum Error { #[snafu(display("Table {} already exists", table))] TableExists { table: String, backtrace: Backtrace }, + #[snafu(display("Schema {} already exists", schema))] + SchemaExists { + schema: String, + backtrace: Backtrace, + }, + #[snafu(display("Failed to register table"))] RegisterTable { #[snafu(backtrace)] @@ -112,7 +118,7 @@ pub enum Error { "Failed to insert table creation record to system catalog, source: {}", source ))] - InsertTableRecord { + InsertCatalogRecord { #[snafu(backtrace)] source: table::error::Error, }, @@ -215,10 +221,11 @@ impl ErrorExt for Error { Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, + Error::SchemaExists { .. } => StatusCode::InvalidArguments, Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } - | Error::InsertTableRecord { source, .. } + | Error::InsertCatalogRecord { source, .. } | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 36544361e2..57b68a9182 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -76,6 +76,9 @@ pub trait CatalogManager: CatalogList { /// returns table registered. async fn register_table(&self, request: RegisterTableRequest) -> Result; + /// Register a schema with catalog name and schema name. + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; + /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; @@ -107,6 +110,12 @@ pub struct RegisterTableRequest { pub table: TableRef, } +#[derive(Debug, Clone)] +pub struct RegisterSchemaRequest { + pub catalog: String, + pub schema: String, +} + /// Formats table fully-qualified name pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String { format!("{}.{}.{}", catalog, schema, table) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 405283df00..8b057ee495 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -22,7 +22,7 @@ use crate::error::{ CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, }; -use crate::error::{ReadSystemCatalogSnafu, Result}; +use crate::error::{ReadSystemCatalogSnafu, Result, SchemaExistsSnafu}; use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, @@ -31,8 +31,8 @@ use crate::system::{ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, - CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, RegisterTableRequest, - SchemaProvider, + CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest, + RegisterTableRequest, SchemaProvider, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -334,6 +334,34 @@ impl CatalogManager for LocalCatalogManager { Ok(1) } + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + let started = self.init_lock.lock().await; + ensure!( + *started, + IllegalManagerStateSnafu { + msg: "Catalog manager not started", + } + ); + let catalog_name = &request.catalog; + let schema_name = &request.schema; + + let catalog = self + .catalogs + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; + if catalog.schema(schema_name)?.is_some() { + return SchemaExistsSnafu { + schema: schema_name, + } + .fail(); + } + self.system + .register_schema(request.catalog, schema_name.clone()) + .await?; + catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; + Ok(1) + } + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { ensure!( !*self.init_lock.lock().await, diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index fb04c59599..cfa6434cf3 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -13,8 +13,8 @@ use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::schema::SchemaProvider; use crate::{ - CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, - RegisterTableRequest, SchemaProviderRef, + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, + RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef, }; /// Simple in-memory list of catalogs @@ -70,6 +70,17 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| if v.is_some() { 0 } else { 1 }) } + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + let catalogs = self.catalogs.write().unwrap(); + let catalog = catalogs + .get(&request.catalog) + .context(CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })?; + catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; + Ok(1) + } + async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> { unimplemented!() } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 63af7cec4d..41f5993921 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -32,7 +32,8 @@ use crate::error::{InvalidTableSchemaSnafu, Result}; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, + SchemaProviderRef, }; /// Catalog manager based on metasrv. @@ -456,6 +457,17 @@ impl CatalogManager for RemoteCatalogManager { Ok(1) } + async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { + let catalog_name = request.catalog; + let schema_name = request.schema; + let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { + catalog_name: &catalog_name, + })?; + let schema_provider = self.new_schema_provider(&catalog_name, &schema_name); + catalog_provider.register_schema(schema_name, schema_provider)?; + Ok(1) + } + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { let mut requests = self.system_table_requests.lock().await; requests.push(request); diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 232341e0b0..812f41a269 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -2,6 +2,7 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{ INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, @@ -178,15 +179,36 @@ fn build_system_catalog_schema() -> Schema { } pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { + build_insert_request( + EntryType::Table, + full_table_name.as_bytes(), + serde_json::to_string(&TableEntryValue { table_id }) + .unwrap() + .as_bytes(), + ) +} + +pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest { + let full_schema_name = format!("{}.{}", catalog_name, schema_name); + build_insert_request( + EntryType::Schema, + full_schema_name.as_bytes(), + serde_json::to_string(&SchemaEntryValue {}) + .unwrap() + .as_bytes(), + ) +} + +pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest { let mut columns_values = HashMap::with_capacity(6); columns_values.insert( "entry_type".to_string(), - Arc::new(UInt8Vector::from_slice(&[EntryType::Table as u8])) as _, + Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _, ); columns_values.insert( "key".to_string(), - Arc::new(BinaryVector::from_slice(&[full_table_name.as_bytes()])) as _, + Arc::new(BinaryVector::from_slice(&[key])) as _, ); // Timestamp in key part is intentionally left to 0 @@ -197,11 +219,7 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> columns_values.insert( "value".to_string(), - Arc::new(BinaryVector::from_slice(&[serde_json::to_string( - &TableEntryValue { table_id }, - ) - .unwrap() - .as_bytes()])) as _, + Arc::new(BinaryVector::from_slice(&[value])) as _, ); columns_values.insert( @@ -219,6 +237,8 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> ); InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), columns_values, } @@ -324,6 +344,9 @@ pub struct SchemaEntry { pub schema_name: String, } +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SchemaEntryValue; + #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)] pub struct TableEntry { pub catalog_name: String, diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 2af8b6fcb4..321d086ffd 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -24,8 +24,8 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; -use crate::error::{Error, InsertTableRecordSnafu}; -use crate::system::{build_table_insert_request, SystemCatalogTable}; +use crate::error::{Error, InsertCatalogRecordSnafu}; +use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; use crate::{ format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, }; @@ -254,7 +254,20 @@ impl SystemCatalog { .system .insert(request) .await - .context(InsertTableRecordSnafu) + .context(InsertCatalogRecordSnafu) + } + + pub async fn register_schema( + &self, + catalog: String, + schema: String, + ) -> crate::error::Result { + let request = build_schema_insert_request(catalog, schema); + self.information_schema + .system + .insert(request) + .await + .context(InsertCatalogRecordSnafu) } } diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs index 2755ec10b7..43f625adfb 100644 --- a/src/client/examples/insert.rs +++ b/src/client/examples/insert.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use api::v1::{codec::InsertBatch, *}; use client::{Client, Database}; - fn main() { tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) .unwrap(); diff --git a/src/common/insert/src/insert.rs b/src/common/insert/src/insert.rs index cc7c9eea91..aa7f294a96 100644 --- a/src/common/insert/src/insert.rs +++ b/src/common/insert/src/insert.rs @@ -175,6 +175,8 @@ pub fn build_create_table_request( } pub fn insertion_expr_to_request( + catalog_name: &str, + schema_name: &str, table_name: &str, insert_batches: Vec, table: Arc, @@ -221,6 +223,8 @@ pub fn insertion_expr_to_request( .collect(); Ok(InsertRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), table_name: table_name.to_string(), columns_values, }) @@ -464,8 +468,11 @@ mod tests { values: mock_insert_batches(), }; let insert_batches = insert_batches(values.values).unwrap(); - let insert_req = insertion_expr_to_request("demo", insert_batches, table).unwrap(); + let insert_req = + insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap(); + assert_eq!("greptime", insert_req.catalog_name); + assert_eq!("public", insert_req.schema_name); assert_eq!("demo", insert_req.table_name); let host = insert_req.columns_values.get("host").unwrap(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a4b6a8ffaa..6473271691 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -177,6 +177,12 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to register a new schema, source: {}", source))] + RegisterSchema { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + #[snafu(display("Failed to decode as physical plan, source: {}", source))] IntoPhysicalPlan { #[snafu(backtrace)] @@ -321,6 +327,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::CreateDir { .. } | Error::InsertSystemCatalog { .. } + | Error::RegisterSchema { .. } | Error::Conversion { .. } | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index d21d4b7b4f..2e4e33567d 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -8,6 +8,7 @@ use api::v1::{ use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::status_code::StatusCode; +use common_insert::insertion_expr_to_request; use common_query::Output; use common_telemetry::logging::{debug, info}; use query::plan::LogicalPlan; @@ -17,8 +18,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::AddColumnRequest; use crate::error::{ - CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, ExecuteSqlSnafu, InsertDataSnafu, - InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu, + CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, + ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, + UnsupportedExprSnafu, }; use crate::instance::Instance; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; @@ -85,7 +87,7 @@ impl Instance { let _result = self .sql_handler() - .execute(SqlRequest::Create(create_table_request)) + .execute(SqlRequest::CreateTable(create_table_request)) .await?; info!("Success to create table: {} automatically", table_name); @@ -95,21 +97,19 @@ impl Instance { pub async fn execute_grpc_insert( &self, + catalog_name: &str, + schema_name: &str, table_name: &str, values: insert_expr::Values, ) -> Result { - // maybe infer from insert batch? - let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = DEFAULT_SCHEMA_NAME; - let schema_provider = self .catalog_manager .catalog(catalog_name) - .unwrap() - .expect("default catalog must exist") + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { name: catalog_name })? .schema(schema_name) - .expect("default schema must exist") - .unwrap(); + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { name: schema_name })?; let insert_batches = common_insert::insert_batches(values.values).context(InsertDataSnafu)?; @@ -141,9 +141,14 @@ impl Instance { .context(TableNotFoundSnafu { table_name })? }; - let insert = - common_insert::insertion_expr_to_request(table_name, insert_batches, table.clone()) - .context(InsertDataSnafu)?; + let insert = insertion_expr_to_request( + catalog_name, + schema_name, + table_name, + insert_batches, + table.clone(), + ) + .context(InsertDataSnafu)?; let affected_rows = table .insert(insert) @@ -153,8 +158,17 @@ impl Instance { Ok(Output::AffectedRows(affected_rows)) } - async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult { - match self.execute_grpc_insert(table_name, values).await { + async fn handle_insert( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + values: insert_expr::Values, + ) -> ObjectResult { + match self + .execute_grpc_insert(catalog_name, schema_name, table_name, values) + .await + { Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) .mutate_result(rows as u32, 0) @@ -207,6 +221,9 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { let object_resp = match query.expr { Some(object_expr::Expr::Insert(insert_expr)) => { + // TODO(dennis): retrieve schema name from DatabaseRequest + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = DEFAULT_SCHEMA_NAME; let table_name = &insert_expr.table_name; let expr = insert_expr .expr @@ -227,7 +244,8 @@ impl GrpcQueryHandler for Instance { match expr { insert_expr::Expr::Values(values) => { - self.handle_insert(table_name, values).await + self.handle_insert(catalog_name, schema_name, table_name, values) + .await } insert_expr::Expr::Sql(sql) => { let output = self.execute_sql(&sql).await; diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index c1c1ca9f6d..0e770bf371 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -1,5 +1,4 @@ use async_trait::async_trait; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; use common_query::Output; use common_telemetry::{ @@ -9,8 +8,11 @@ use common_telemetry::{ use servers::query_handler::SqlQueryHandler; use snafu::prelude::*; use sql::statements::statement::Statement; +use table::requests::CreateDatabaseRequest; -use crate::error::{CatalogSnafu, ExecuteSqlSnafu, Result}; +use crate::error::{ + CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, +}; use crate::instance::Instance; use crate::metric; use crate::sql::SqlRequest; @@ -35,20 +37,35 @@ impl Instance { .context(ExecuteSqlSnafu) } Statement::Insert(i) => { + let (catalog_name, schema_name, _table_name) = + i.full_table_name().context(ParseSqlSnafu)?; + let schema_provider = self .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .expect("datafusion does not accept fallible catalog access") - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .expect("datafusion does not accept fallible catalog access") - .unwrap(); + .catalog(&catalog_name) + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { name: catalog_name })? + .schema(&schema_name) + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { name: schema_name })?; let request = self.sql_handler.insert_to_request(schema_provider, *i)?; self.sql_handler.execute(request).await } - Statement::Create(c) => { + Statement::CreateDatabase(c) => { + let request = CreateDatabaseRequest { + db_name: c.name.to_string(), + }; + + info!("Creating a new database: {}", request.db_name); + + self.sql_handler + .execute(SqlRequest::CreateDatabase(request)) + .await + } + + Statement::CreateTable(c) => { let table_id = self .catalog_manager .next_table_id() @@ -67,7 +84,9 @@ impl Instance { catalog_name, schema_name, table_name, table_id ); - self.sql_handler.execute(SqlRequest::Create(request)).await + self.sql_handler + .execute(SqlRequest::CreateTable(request)) + .await } Statement::Alter(alter_table) => { let req = self.sql_handler.alter_to_request(alter_table)?; diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 7f12d25ff1..ac60c80ce1 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -20,7 +20,7 @@ impl Instance { pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { let request = self.create_expr_to_request(expr).await; let result = futures::future::ready(request) - .and_then(|request| self.sql_handler().execute(SqlRequest::Create(request))) + .and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request))) .await; match result { Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 2df2262b73..c3da100f7e 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -22,7 +22,8 @@ mod show; #[derive(Debug)] pub enum SqlRequest { Insert(InsertRequest), - Create(CreateTableRequest), + CreateTable(CreateTableRequest), + CreateDatabase(CreateDatabaseRequest), Alter(AlterTableRequest), ShowDatabases(ShowDatabases), ShowTables(ShowTables), @@ -45,7 +46,8 @@ impl SqlHandler { pub async fn execute(&self, request: SqlRequest) -> Result { match request { SqlRequest::Insert(req) => self.insert(req).await, - SqlRequest::Create(req) => self.create(req).await, + SqlRequest::CreateTable(req) => self.create_table(req).await, + SqlRequest::CreateDatabase(req) => self.create_database(req).await, SqlRequest::Alter(req) => self.alter(req).await, SqlRequest::ShowDatabases(stmt) => self.show_databases(stmt).await, SqlRequest::ShowTables(stmt) => self.show_tables(stmt).await, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index fb24519bc2..634973ca2b 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -1,13 +1,14 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::RegisterTableRequest; +use catalog::{RegisterSchemaRequest, RegisterTableRequest}; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_telemetry::tracing::info; use datatypes::schema::SchemaBuilder; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::TableConstraint; -use sql::statements::create_table::CreateTable; +use sql::statements::create::CreateTable; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use store_api::storage::consts::TIME_INDEX_NAME; use table::engine::EngineContext; @@ -16,12 +17,28 @@ use table::requests::*; use crate::error::{ self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, - InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, Result, + InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, + Result, }; use crate::sql::SqlHandler; impl SqlHandler { - pub(crate) async fn create(&self, req: CreateTableRequest) -> Result { + pub(crate) async fn create_database(&self, req: CreateDatabaseRequest) -> Result { + let schema = req.db_name; + let req = RegisterSchemaRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: schema.clone(), + }; + self.catalog_manager + .register_schema(req) + .await + .context(RegisterSchemaSnafu)?; + + info!("Successfully created database: {:?}", schema); + Ok(Output::AffectedRows(1)) + } + + pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result { let ctx = EngineContext {}; // determine catalog and schema from the very beginning let table_name = req.table_name.clone(); @@ -52,7 +69,7 @@ impl SqlHandler { Ok(Output::AffectedRows(1)) } - /// Converts [CreateTable] to [SqlRequest::Create]. + /// Converts [CreateTable] to [SqlRequest::CreateTable]. pub(crate) fn create_to_request( &self, table_id: TableId, @@ -181,7 +198,7 @@ mod tests { let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); assert_eq!(1, res.len()); match res.pop().unwrap() { - Statement::Create(c) => c, + Statement::CreateTable(c) => c, _ => { panic!("Unexpected statement!") } diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index cbc6f201b7..556842c514 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -10,7 +10,7 @@ use sql::statements::{self, insert::Insert}; use table::requests::*; use crate::error::{ - CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, + CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu, ParseSqlValueSnafu, Result, TableNotFoundSnafu, }; use crate::sql::{SqlHandler, SqlRequest}; @@ -35,9 +35,8 @@ impl SqlHandler { ) -> Result { let columns = stmt.columns(); let values = stmt.values().context(ParseSqlValueSnafu)?; - //TODO(dennis): table name may be in the form of `catalog.schema.table`, - // but we don't process it right now. - let table_name = stmt.table_name(); + let (catalog_name, schema_name, table_name) = + stmt.full_table_name().context(ParseSqlSnafu)?; let table = schema_provider .table(&table_name) @@ -101,6 +100,8 @@ impl SqlHandler { } Ok(SqlRequest::Insert(InsertRequest { + catalog_name, + schema_name, table_name, columns_values: columns_builders .into_iter() diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index c542c4cd35..8edd6fd500 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -9,6 +9,62 @@ use datatypes::prelude::ConcreteDataType; use crate::instance::Instance; use crate::tests::test_util; +#[tokio::test(flavor = "multi_thread")] +async fn test_create_database_and_insert_query() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = + test_util::create_tmp_dir_and_datanode_opts("create_database_and_insert_query"); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let output = instance.execute_sql("create database test").await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .execute_sql( + r#"create table greptime.test.demo( + host STRING, + cpu DOUBLE, + memory DOUBLE, + ts bigint, + TIME INDEX(ts) +)"#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .execute_sql( + r#"insert into test.demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(2))); + + let query_output = instance + .execute_sql("select ts from test.demo order by ts") + .await + .unwrap(); + + match query_output { + Output::Stream(s) => { + let batches = util::collect(s).await.unwrap(); + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!( + &Int64Array::from_slice(&[1655276557000, 1655276558000]), + columns[0].as_any().downcast_ref::().unwrap() + ); + } + _ => unreachable!(), + } +} + #[tokio::test(flavor = "multi_thread")] async fn test_execute_insert() { common_telemetry::init_default_ut_logging(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 706af59c2c..efd6544e19 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -22,7 +22,7 @@ use servers::query_handler::{ }; use snafu::prelude::*; use sql::ast::{ColumnDef, TableConstraint}; -use sql::statements::create_table::{CreateTable, TIME_INDEX}; +use sql::statements::create::{CreateTable, TIME_INDEX}; use sql::statements::statement::Statement; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; @@ -105,7 +105,15 @@ impl SqlQueryHandler for Instance { .await .and_then(|object_result| object_result.try_into()), Statement::Insert(insert) => { - let table_name = insert.table_name(); + // TODO(dennis): respect schema_name when inserting data + let (_catalog_name, _schema_name, table_name) = insert + .full_table_name() + .context(error::ParseSqlSnafu) + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "Failed to get table name", + })?; + let expr = InsertExpr { table_name, expr: Some(insert_expr::Expr::Sql(query.to_string())), @@ -116,7 +124,7 @@ impl SqlQueryHandler for Instance { .await .and_then(|object_result| object_result.try_into()) } - Statement::Create(create) => { + Statement::CreateTable(create) => { let expr = create_to_expr(create) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query })?; diff --git a/src/frontend/src/spliter.rs b/src/frontend/src/spliter.rs index a3771afe6a..b4e8db0d4f 100644 --- a/src/frontend/src/spliter.rs +++ b/src/frontend/src/spliter.rs @@ -133,6 +133,8 @@ fn partition_insert_request( } } + let catalog_name = &insert.catalog_name; + let schema_name = &insert.schema_name; let table_name = &insert.table_name; dist_insert .into_iter() @@ -144,6 +146,8 @@ fn partition_insert_request( ( region_id, InsertRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), table_name: table_name.to_string(), columns_values, }, @@ -156,6 +160,7 @@ fn partition_insert_request( mod tests { use std::{collections::HashMap, result::Result, sync::Arc}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::{ data_type::ConcreteDataType, types::{BooleanType, StringType}, @@ -373,6 +378,8 @@ mod tests { columns_values.insert("id".to_string(), builder.finish()); InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, } @@ -398,6 +405,8 @@ mod tests { columns_values.insert("id".to_string(), builder.finish()); InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, } diff --git a/src/frontend/src/sql.rs b/src/frontend/src/sql.rs index 78b8003ab6..fdafa8d172 100644 --- a/src/frontend/src/sql.rs +++ b/src/frontend/src/sql.rs @@ -18,7 +18,8 @@ pub(crate) fn insert_to_request( ) -> Result { let columns = stmt.columns(); let values = stmt.values().context(error::ParseSqlSnafu)?; - let table_name = stmt.table_name(); + let (catalog_name, schema_name, table_name) = + stmt.full_table_name().context(error::ParseSqlSnafu)?; let table = schema_provider .table(&table_name) @@ -80,6 +81,8 @@ pub(crate) fn insert_to_request( } Ok(InsertRequest { + catalog_name, + schema_name, table_name, columns_values: columns_builders .into_iter() diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 01fd2a1852..fb1e637d81 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -130,6 +130,7 @@ mod tests { insert_expr::Expr, ColumnDataType, InsertExpr, }; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder}; use table::requests::InsertRequest; @@ -160,6 +161,8 @@ mod tests { columns_values.insert("id".to_string(), builder.finish()); InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, } diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 4e0a58bdc2..0c3a989cfb 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -49,7 +49,8 @@ where Statement::ShowTables(_) | Statement::ShowDatabases(_) | Statement::ShowCreateTable(_) - | Statement::Create(_) + | Statement::CreateTable(_) + | Statement::CreateDatabase(_) | Statement::Alter(_) | Statement::Insert(_) => unreachable!(), } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 19b58178df..39cb261740 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -117,6 +117,8 @@ impl ScriptsTable { let _ = table .insert(InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SCRIPTS_TABLE_NAME.to_string(), columns_values, }) diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index 0b9927a5dd..ed6b6c5ab4 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::writer::{to_ms_ts, Precision}; use common_time::{timestamp::TimeUnit::Millisecond, Timestamp}; use datatypes::{ @@ -120,6 +121,9 @@ impl LineWriter { .map(|(column_name, (mut builder, _))| (column_name, builder.finish())) .collect(); InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + // TODO(dennis): supports database + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: self.table_name, columns_values, } diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 7724a1ee24..e589f4dfa3 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -19,7 +19,7 @@ pub enum Error { Unsupported { sql: String, keyword: String }, #[snafu(display( - "Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}", + "Unexpected token while parsing SQL statement: {}, expected: '{}', found: {}, source: {}", sql, expected, actual, diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 1bb7f3999a..209c1a2719 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -234,7 +234,7 @@ impl<'a> ParserContext<'a> { } #[inline] - fn peek_token_as_string(&self) -> String { + pub(crate) fn peek_token_as_string(&self) -> String { self.parser.peek_token().to_string() } diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index ad6bfd1701..41df8174cd 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -13,7 +13,9 @@ use table_engine::engine; use crate::ast::{ColumnDef, Ident, TableConstraint, Value as SqlValue}; use crate::error::{self, InvalidTimeIndexSnafu, Result, SyntaxSnafu}; use crate::parser::ParserContext; -use crate::statements::create_table::{CreateTable, PartitionEntry, Partitions, TIME_INDEX}; +use crate::statements::create::{ + CreateDatabase, CreateTable, PartitionEntry, Partitions, TIME_INDEX, +}; use crate::statements::statement::Statement; use crate::statements::{sql_data_type_to_concrete_data_type, sql_value_to_value}; @@ -26,9 +28,37 @@ static THAN: Lazy = Lazy::new(|| Token::make_keyword("THAN")); /// Parses create [table] statement impl<'a> ParserContext<'a> { pub(crate) fn parse_create(&mut self) -> Result { - self.parser - .expect_keyword(Keyword::TABLE) - .context(error::SyntaxSnafu { sql: self.sql })?; + match self.parser.peek_token() { + Token::Word(w) => match w.keyword { + Keyword::TABLE => self.parse_create_table(), + + Keyword::DATABASE => self.parse_create_database(), + + _ => self.unsupported(w.to_string()), + }, + unexpected => self.unsupported(unexpected.to_string()), + } + } + + fn parse_create_database(&mut self) -> Result { + self.parser.next_token(); + + let database_name = self + .parser + .parse_object_name() + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "a database name", + actual: self.peek_token_as_string(), + })?; + + Ok(Statement::CreateDatabase(CreateDatabase { + name: database_name, + })) + } + + fn parse_create_table(&mut self) -> Result { + self.parser.next_token(); let if_not_exists = self.parser .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); @@ -36,7 +66,12 @@ impl<'a> ParserContext<'a> { let table_name = self .parser .parse_object_name() - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "a table name", + actual: self.peek_token_as_string(), + })?; + let (columns, constraints) = self.parse_columns()?; let partitions = self.parse_partitions()?; @@ -59,7 +94,7 @@ impl<'a> ParserContext<'a> { }; validate_create(&create_table)?; - Ok(Statement::Create(create_table)) + Ok(Statement::CreateTable(create_table)) } // "PARTITION BY ..." syntax: @@ -70,7 +105,11 @@ impl<'a> ParserContext<'a> { } self.parser .expect_keywords(&[Keyword::BY, Keyword::RANGE, Keyword::COLUMNS]) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "BY, RANGE, COLUMNS", + actual: self.peek_token_as_string(), + })?; let column_list = self .parser @@ -88,7 +127,11 @@ impl<'a> ParserContext<'a> { fn parse_partition_entry(&mut self) -> Result { self.parser .expect_keyword(Keyword::PARTITION) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "PARTITION", + actual: self.peek_token_as_string(), + })?; let name = self .parser @@ -128,7 +171,11 @@ impl<'a> ParserContext<'a> { { self.parser .expect_token(&Token::LParen) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "(", + actual: self.peek_token_as_string(), + })?; let mut values = vec![]; while self.parser.peek_token() != Token::RParen { @@ -140,7 +187,12 @@ impl<'a> ParserContext<'a> { self.parser .expect_token(&Token::RParen) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: ")", + actual: self.peek_token_as_string(), + })?; + Ok(values) } @@ -196,7 +248,11 @@ impl<'a> ParserContext<'a> { Token::Word(w) if w.keyword == Keyword::PRIMARY => { self.parser .expect_keyword(Keyword::KEY) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "KEY", + actual: self.peek_token_as_string(), + })?; let columns = self .parser .parse_parenthesized_column_list(Mandatory) @@ -210,7 +266,12 @@ impl<'a> ParserContext<'a> { Token::Word(w) if w.keyword == Keyword::TIME => { self.parser .expect_keyword(Keyword::INDEX) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "INDEX", + actual: self.peek_token_as_string(), + })?; + let columns = self .parser .parse_parenthesized_column_list(Mandatory) @@ -248,7 +309,11 @@ impl<'a> ParserContext<'a> { self.parser .expect_token(&Token::Eq) - .context(error::SyntaxSnafu { sql: self.sql })?; + .context(error::UnexpectedSnafu { + sql: self.sql, + expected: "=", + actual: self.peek_token_as_string(), + })?; match self.parser.next_token() { Token::Word(w) => Ok(w.value), @@ -420,6 +485,27 @@ mod tests { use super::*; + #[test] + fn test_parse_create_database() { + let sql = "create database"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("Unexpected token while parsing SQL statement")); + + let sql = "create database prometheus"; + let stmts = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + + assert_eq!(1, stmts.len()); + match &stmts[0] { + Statement::CreateDatabase(c) => { + assert_eq!(c.name.to_string(), "prometheus"); + } + _ => unreachable!(), + } + } + #[test] fn test_validate_create() { let sql = r" @@ -555,7 +641,7 @@ ENGINE=mito"; let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); assert_eq!(result.len(), 1); match &result[0] { - Statement::Create(c) => { + Statement::CreateTable(c) => { assert!(c.partitions.is_some()); let partitions = c.partitions.as_ref().unwrap(); @@ -670,7 +756,7 @@ ENGINE=mito"; let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); assert_eq!(1, result.len()); match &result[0] { - Statement::Create(c) => { + Statement::CreateTable(c) => { assert!(!c.if_not_exists); assert_eq!("demo", c.name.to_string()); assert_eq!("mito", c.engine); diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 8ea13b47d7..db9b5bdb07 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -1,5 +1,5 @@ pub mod alter; -pub mod create_table; +pub mod create; pub mod insert; pub mod query; pub mod show; @@ -32,6 +32,11 @@ pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, Strin DEFAULT_SCHEMA_NAME.to_string(), table.value.clone(), )), + [schema, table] => Ok(( + DEFAULT_CATALOG_NAME.to_string(), + schema.value.clone(), + table.value.clone(), + )), [catalog, schema, table] => Ok(( catalog.value.clone(), schema.value.clone(), @@ -39,7 +44,7 @@ pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, Strin )), _ => error::InvalidSqlSnafu { msg: format!( - "expect table name to be .. or
, actual: {}", + "expect table name to be ..
, .
or
, actual: {}", obj_name ), } diff --git a/src/sql/src/statements/create_table.rs b/src/sql/src/statements/create.rs similarity index 89% rename from src/sql/src/statements/create_table.rs rename to src/sql/src/statements/create.rs index ba145a6339..41d9ef92fc 100644 --- a/src/sql/src/statements/create_table.rs +++ b/src/sql/src/statements/create.rs @@ -29,3 +29,8 @@ pub struct PartitionEntry { pub name: Ident, pub value_list: Vec, } + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct CreateDatabase { + pub name: ObjectName, +} diff --git a/src/sql/src/statements/insert.rs b/src/sql/src/statements/insert.rs index 5ab89c7a3f..89e88da221 100644 --- a/src/sql/src/statements/insert.rs +++ b/src/sql/src/statements/insert.rs @@ -3,6 +3,7 @@ use sqlparser::parser::ParserError; use crate::ast::{Expr, Value}; use crate::error::{self, Result}; +use crate::statements::table_idents_to_full_name; #[derive(Debug, Clone, PartialEq, Eq)] pub struct Insert { @@ -11,12 +12,9 @@ pub struct Insert { } impl Insert { - pub fn table_name(&self) -> String { + pub fn full_table_name(&self) -> Result<(String, String, String)> { match &self.inner { - Statement::Insert { table_name, .. } => { - // FIXME(dennis): table_name may be in the form of "catalog.schema.table" - table_name.to_string() - } + Statement::Insert { table_name, .. } => table_idents_to_full_name(table_name), _ => unreachable!(), } } diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 3bb3473cb0..0d1582845b 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -2,7 +2,7 @@ use sqlparser::ast::Statement as SpStatement; use sqlparser::parser::ParserError; use crate::statements::alter::AlterTable; -use crate::statements::create_table::CreateTable; +use crate::statements::create::{CreateDatabase, CreateTable}; use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowTables}; @@ -15,7 +15,9 @@ pub enum Statement { // Insert Insert(Box), /// CREATE TABLE - Create(CreateTable), + CreateTable(CreateTable), + // CREATE DATABASE + CreateDatabase(CreateDatabase), /// ALTER TABLE Alter(AlterTable), // Databases. @@ -43,7 +45,9 @@ impl TryFrom for SpStatement { )), Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))), Statement::Insert(i) => Ok(i.inner), - Statement::Create(_) | Statement::Alter(_) => unimplemented!(), + Statement::CreateDatabase(_) | Statement::CreateTable(_) | Statement::Alter(_) => { + unimplemented!() + } } } } diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 401bfe6917..7bb20fc5a4 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -12,6 +12,7 @@ arc-swap = "1.0" async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 3196db8d44..f85bbd9cb6 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -443,12 +443,12 @@ mod tests { use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; - use table::requests::{AddColumnRequest, AlterKind, InsertRequest}; + use table::requests::{AddColumnRequest, AlterKind}; use tempdir::TempDir; use super::*; use crate::table::test_util; - use crate::table::test_util::{MockRegion, TABLE_NAME}; + use crate::table::test_util::{new_insert_request, MockRegion, TABLE_NAME}; async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { let table_name = "test_default_constraint"; @@ -518,10 +518,7 @@ mod tests { columns_values.insert("name".to_string(), Arc::new(names.clone())); columns_values.insert("ts".to_string(), Arc::new(tss.clone())); - let insert_req = InsertRequest { - table_name: table_name.to_string(), - columns_values, - }; + let insert_req = new_insert_request(table_name.to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); @@ -557,10 +554,7 @@ mod tests { columns_values.insert("n".to_string(), Arc::new(nums.clone())); columns_values.insert("ts".to_string(), Arc::new(tss.clone())); - let insert_req = InsertRequest { - table_name: table_name.to_string(), - columns_values, - }; + let insert_req = new_insert_request(table_name.to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); @@ -601,10 +595,7 @@ mod tests { assert_eq!(TableType::Base, table.table_type()); assert_eq!(schema, table.schema()); - let insert_req = InsertRequest { - table_name: "demo".to_string(), - columns_values: HashMap::default(), - }; + let insert_req = new_insert_request("demo".to_string(), HashMap::default()); assert_eq!(0, table.insert(insert_req).await.unwrap()); let mut columns_values: HashMap = HashMap::with_capacity(4); @@ -618,10 +609,7 @@ mod tests { columns_values.insert("memory".to_string(), Arc::new(memories.clone())); columns_values.insert("ts".to_string(), Arc::new(tss.clone())); - let insert_req = InsertRequest { - table_name: "demo".to_string(), - columns_values, - }; + let insert_req = new_insert_request("demo".to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); @@ -710,10 +698,7 @@ mod tests { columns_values.insert("memory".to_string(), Arc::new(memories)); columns_values.insert("ts".to_string(), Arc::new(tss.clone())); - let insert_req = InsertRequest { - table_name: "demo".to_string(), - columns_values, - }; + let insert_req = new_insert_request("demo".to_string(), columns_values); assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index c39ce66e3e..228a910242 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -1,10 +1,11 @@ mod mock_engine; - use std::collections::HashMap; use std::sync::Arc; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use datatypes::vectors::VectorRef; use log_store::fs::noop::NoopLogStore; use object_store::{services::fs::Builder, ObjectStore}; use storage::config::EngineConfig as StorageEngineConfig; @@ -13,6 +14,7 @@ use table::engine::EngineContext; use table::engine::TableEngine; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; +use table::requests::InsertRequest; use table::TableRef; use tempdir::TempDir; @@ -24,6 +26,19 @@ pub use crate::table::test_util::mock_engine::MockRegion; pub const TABLE_NAME: &str = "demo"; +/// Create a InsertRequest with default catalog and schema. +pub fn new_insert_request( + table_name: String, + columns_values: HashMap, +) -> InsertRequest { + InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name, + columns_values, + } +} + pub fn schema_for_test() -> Schema { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 449dca5d69..1f72077d70 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -9,10 +9,17 @@ use crate::metadata::TableId; /// Insert request #[derive(Debug)] pub struct InsertRequest { + pub catalog_name: String, + pub schema_name: String, pub table_name: String, pub columns_values: HashMap, } +#[derive(Debug, Clone)] +pub struct CreateDatabaseRequest { + pub db_name: String, +} + /// Create table request #[derive(Debug, Clone)] pub struct CreateTableRequest {