mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: create database (#451)
* feat: parsing create database statement * feat: impl create database in datanode * feat: supports insert into catalog.schema.table * fix: conflicts with develop branch * test: create database then insert and query * fix: grpc schema provider * feat: use CatalogManager::register_schema instead of CatalogProvide::register_schema * refactor: revert InsertExpr catalog_name and schema_name * fix: revert database.proto * fix: revert client cargo * feat: accepts schema.table as table name in sql Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5593,6 +5593,7 @@ dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
|
||||
@@ -56,7 +56,7 @@ message InsertExpr {
|
||||
// The "sql" field is meant to be removed in the future.
|
||||
string sql = 3;
|
||||
}
|
||||
|
||||
|
||||
map<string, bytes> options = 4;
|
||||
}
|
||||
|
||||
|
||||
@@ -83,6 +83,12 @@ pub enum Error {
|
||||
#[snafu(display("Table {} already exists", table))]
|
||||
TableExists { table: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Schema {} already exists", schema))]
|
||||
SchemaExists {
|
||||
schema: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register table"))]
|
||||
RegisterTable {
|
||||
#[snafu(backtrace)]
|
||||
@@ -112,7 +118,7 @@ pub enum Error {
|
||||
"Failed to insert table creation record to system catalog, source: {}",
|
||||
source
|
||||
))]
|
||||
InsertTableRecord {
|
||||
InsertCatalogRecord {
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
@@ -215,10 +221,11 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::RegisterTable { .. } => StatusCode::Internal,
|
||||
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
Error::SchemaExists { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::OpenSystemCatalog { source, .. }
|
||||
| Error::CreateSystemCatalog { source, .. }
|
||||
| Error::InsertTableRecord { source, .. }
|
||||
| Error::InsertCatalogRecord { source, .. }
|
||||
| Error::OpenTable { source, .. }
|
||||
| Error::CreateTable { source, .. } => source.status_code(),
|
||||
Error::MetaSrv { source, .. } => source.status_code(),
|
||||
|
||||
@@ -76,6 +76,9 @@ pub trait CatalogManager: CatalogList {
|
||||
/// returns table registered.
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize>;
|
||||
|
||||
/// Register a schema with catalog name and schema name.
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize>;
|
||||
|
||||
/// Register a system table, should be called before starting the manager.
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest)
|
||||
-> error::Result<()>;
|
||||
@@ -107,6 +110,12 @@ pub struct RegisterTableRequest {
|
||||
pub table: TableRef,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegisterSchemaRequest {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
}
|
||||
|
||||
/// Formats table fully-qualified name
|
||||
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
|
||||
format!("{}.{}.{}", catalog, schema, table)
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::error::{
|
||||
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, SchemaNotFoundSnafu,
|
||||
SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::error::{ReadSystemCatalogSnafu, Result};
|
||||
use crate::error::{ReadSystemCatalogSnafu, Result, SchemaExistsSnafu};
|
||||
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::system::{
|
||||
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
|
||||
@@ -31,8 +31,8 @@ use crate::system::{
|
||||
use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
|
||||
CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, RegisterTableRequest,
|
||||
SchemaProvider,
|
||||
CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProvider,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
@@ -334,6 +334,34 @@ impl CatalogManager for LocalCatalogManager {
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
let started = self.init_lock.lock().await;
|
||||
ensure!(
|
||||
*started,
|
||||
IllegalManagerStateSnafu {
|
||||
msg: "Catalog manager not started",
|
||||
}
|
||||
);
|
||||
let catalog_name = &request.catalog;
|
||||
let schema_name = &request.schema;
|
||||
|
||||
let catalog = self
|
||||
.catalogs
|
||||
.catalog(catalog_name)?
|
||||
.context(CatalogNotFoundSnafu { catalog_name })?;
|
||||
if catalog.schema(schema_name)?.is_some() {
|
||||
return SchemaExistsSnafu {
|
||||
schema: schema_name,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
self.system
|
||||
.register_schema(request.catalog, schema_name.clone())
|
||||
.await?;
|
||||
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
ensure!(
|
||||
!*self.init_lock.lock().await,
|
||||
|
||||
@@ -13,8 +13,8 @@ use table::TableRef;
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
use crate::schema::SchemaProvider;
|
||||
use crate::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProviderRef,
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
@@ -70,6 +70,17 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
.map(|v| if v.is_some() { 0 } else { 1 })
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &request.catalog,
|
||||
})?;
|
||||
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -32,7 +32,8 @@ use crate::error::{InvalidTableSchemaSnafu, Result};
|
||||
use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
|
||||
SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Catalog manager based on metasrv.
|
||||
@@ -456,6 +457,17 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
|
||||
let catalog_name = request.catalog;
|
||||
let schema_name = request.schema;
|
||||
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &catalog_name,
|
||||
})?;
|
||||
let schema_provider = self.new_schema_provider(&catalog_name, &schema_name);
|
||||
catalog_provider.register_schema(schema_name, schema_provider)?;
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
let mut requests = self.system_table_requests.lock().await;
|
||||
requests.push(request);
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::consts::{
|
||||
INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
|
||||
SYSTEM_CATALOG_TABLE_NAME,
|
||||
@@ -178,15 +179,36 @@ fn build_system_catalog_schema() -> Schema {
|
||||
}
|
||||
|
||||
pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
|
||||
build_insert_request(
|
||||
EntryType::Table,
|
||||
full_table_name.as_bytes(),
|
||||
serde_json::to_string(&TableEntryValue { table_id })
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
|
||||
let full_schema_name = format!("{}.{}", catalog_name, schema_name);
|
||||
build_insert_request(
|
||||
EntryType::Schema,
|
||||
full_schema_name.as_bytes(),
|
||||
serde_json::to_string(&SchemaEntryValue {})
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest {
|
||||
let mut columns_values = HashMap::with_capacity(6);
|
||||
columns_values.insert(
|
||||
"entry_type".to_string(),
|
||||
Arc::new(UInt8Vector::from_slice(&[EntryType::Table as u8])) as _,
|
||||
Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _,
|
||||
);
|
||||
|
||||
columns_values.insert(
|
||||
"key".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[full_table_name.as_bytes()])) as _,
|
||||
Arc::new(BinaryVector::from_slice(&[key])) as _,
|
||||
);
|
||||
|
||||
// Timestamp in key part is intentionally left to 0
|
||||
@@ -197,11 +219,7 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) ->
|
||||
|
||||
columns_values.insert(
|
||||
"value".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[serde_json::to_string(
|
||||
&TableEntryValue { table_id },
|
||||
)
|
||||
.unwrap()
|
||||
.as_bytes()])) as _,
|
||||
Arc::new(BinaryVector::from_slice(&[value])) as _,
|
||||
);
|
||||
|
||||
columns_values.insert(
|
||||
@@ -219,6 +237,8 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) ->
|
||||
);
|
||||
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
columns_values,
|
||||
}
|
||||
@@ -324,6 +344,9 @@ pub struct SchemaEntry {
|
||||
pub schema_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct SchemaEntryValue;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
|
||||
pub struct TableEntry {
|
||||
pub catalog_name: String,
|
||||
|
||||
@@ -24,8 +24,8 @@ use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::scan::SimpleTableScan;
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::error::{Error, InsertTableRecordSnafu};
|
||||
use crate::system::{build_table_insert_request, SystemCatalogTable};
|
||||
use crate::error::{Error, InsertCatalogRecordSnafu};
|
||||
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
|
||||
use crate::{
|
||||
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
@@ -254,7 +254,20 @@ impl SystemCatalog {
|
||||
.system
|
||||
.insert(request)
|
||||
.await
|
||||
.context(InsertTableRecordSnafu)
|
||||
.context(InsertCatalogRecordSnafu)
|
||||
}
|
||||
|
||||
pub async fn register_schema(
|
||||
&self,
|
||||
catalog: String,
|
||||
schema: String,
|
||||
) -> crate::error::Result<usize> {
|
||||
let request = build_schema_insert_request(catalog, schema);
|
||||
self.information_schema
|
||||
.system
|
||||
.insert(request)
|
||||
.await
|
||||
.context(InsertCatalogRecordSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::collections::HashMap;
|
||||
|
||||
use api::v1::{codec::InsertBatch, *};
|
||||
use client::{Client, Database};
|
||||
|
||||
fn main() {
|
||||
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
|
||||
.unwrap();
|
||||
|
||||
@@ -175,6 +175,8 @@ pub fn build_create_table_request(
|
||||
}
|
||||
|
||||
pub fn insertion_expr_to_request(
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
insert_batches: Vec<InsertBatch>,
|
||||
table: Arc<dyn Table>,
|
||||
@@ -221,6 +223,8 @@ pub fn insertion_expr_to_request(
|
||||
.collect();
|
||||
|
||||
Ok(InsertRequest {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
columns_values,
|
||||
})
|
||||
@@ -464,8 +468,11 @@ mod tests {
|
||||
values: mock_insert_batches(),
|
||||
};
|
||||
let insert_batches = insert_batches(values.values).unwrap();
|
||||
let insert_req = insertion_expr_to_request("demo", insert_batches, table).unwrap();
|
||||
let insert_req =
|
||||
insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap();
|
||||
|
||||
assert_eq!("greptime", insert_req.catalog_name);
|
||||
assert_eq!("public", insert_req.schema_name);
|
||||
assert_eq!("demo", insert_req.table_name);
|
||||
|
||||
let host = insert_req.columns_values.get("host").unwrap();
|
||||
|
||||
@@ -177,6 +177,12 @@ pub enum Error {
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register a new schema, source: {}", source))]
|
||||
RegisterSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode as physical plan, source: {}", source))]
|
||||
IntoPhysicalPlan {
|
||||
#[snafu(backtrace)]
|
||||
@@ -321,6 +327,7 @@ impl ErrorExt for Error {
|
||||
| Error::StartGrpc { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::InsertSystemCatalog { .. }
|
||||
| Error::RegisterSchema { .. }
|
||||
| Error::Conversion { .. }
|
||||
| Error::IntoPhysicalPlan { .. }
|
||||
| Error::UnsupportedExpr { .. }
|
||||
|
||||
@@ -8,6 +8,7 @@ use api::v1::{
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_insert::insertion_expr_to_request;
|
||||
use common_query::Output;
|
||||
use common_telemetry::logging::{debug, info};
|
||||
use query::plan::LogicalPlan;
|
||||
@@ -17,8 +18,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use table::requests::AddColumnRequest;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, ExecuteSqlSnafu, InsertDataSnafu,
|
||||
InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu,
|
||||
CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu,
|
||||
ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
|
||||
UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::instance::Instance;
|
||||
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
|
||||
@@ -85,7 +87,7 @@ impl Instance {
|
||||
|
||||
let _result = self
|
||||
.sql_handler()
|
||||
.execute(SqlRequest::Create(create_table_request))
|
||||
.execute(SqlRequest::CreateTable(create_table_request))
|
||||
.await?;
|
||||
|
||||
info!("Success to create table: {} automatically", table_name);
|
||||
@@ -95,21 +97,19 @@ impl Instance {
|
||||
|
||||
pub async fn execute_grpc_insert(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
values: insert_expr::Values,
|
||||
) -> Result<Output> {
|
||||
// maybe infer from insert batch?
|
||||
let catalog_name = DEFAULT_CATALOG_NAME;
|
||||
let schema_name = DEFAULT_SCHEMA_NAME;
|
||||
|
||||
let schema_provider = self
|
||||
.catalog_manager
|
||||
.catalog(catalog_name)
|
||||
.unwrap()
|
||||
.expect("default catalog must exist")
|
||||
.context(CatalogSnafu)?
|
||||
.context(CatalogNotFoundSnafu { name: catalog_name })?
|
||||
.schema(schema_name)
|
||||
.expect("default schema must exist")
|
||||
.unwrap();
|
||||
.context(CatalogSnafu)?
|
||||
.context(SchemaNotFoundSnafu { name: schema_name })?;
|
||||
|
||||
let insert_batches =
|
||||
common_insert::insert_batches(values.values).context(InsertDataSnafu)?;
|
||||
@@ -141,9 +141,14 @@ impl Instance {
|
||||
.context(TableNotFoundSnafu { table_name })?
|
||||
};
|
||||
|
||||
let insert =
|
||||
common_insert::insertion_expr_to_request(table_name, insert_batches, table.clone())
|
||||
.context(InsertDataSnafu)?;
|
||||
let insert = insertion_expr_to_request(
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
insert_batches,
|
||||
table.clone(),
|
||||
)
|
||||
.context(InsertDataSnafu)?;
|
||||
|
||||
let affected_rows = table
|
||||
.insert(insert)
|
||||
@@ -153,8 +158,17 @@ impl Instance {
|
||||
Ok(Output::AffectedRows(affected_rows))
|
||||
}
|
||||
|
||||
async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult {
|
||||
match self.execute_grpc_insert(table_name, values).await {
|
||||
async fn handle_insert(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
values: insert_expr::Values,
|
||||
) -> ObjectResult {
|
||||
match self
|
||||
.execute_grpc_insert(catalog_name, schema_name, table_name, values)
|
||||
.await
|
||||
{
|
||||
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
|
||||
.status_code(StatusCode::Success as u32)
|
||||
.mutate_result(rows as u32, 0)
|
||||
@@ -207,6 +221,9 @@ impl GrpcQueryHandler for Instance {
|
||||
async fn do_query(&self, query: ObjectExpr) -> servers::error::Result<ObjectResult> {
|
||||
let object_resp = match query.expr {
|
||||
Some(object_expr::Expr::Insert(insert_expr)) => {
|
||||
// TODO(dennis): retrieve schema name from DatabaseRequest
|
||||
let catalog_name = DEFAULT_CATALOG_NAME;
|
||||
let schema_name = DEFAULT_SCHEMA_NAME;
|
||||
let table_name = &insert_expr.table_name;
|
||||
let expr = insert_expr
|
||||
.expr
|
||||
@@ -227,7 +244,8 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
match expr {
|
||||
insert_expr::Expr::Values(values) => {
|
||||
self.handle_insert(table_name, values).await
|
||||
self.handle_insert(catalog_name, schema_name, table_name, values)
|
||||
.await
|
||||
}
|
||||
insert_expr::Expr::Sql(sql) => {
|
||||
let output = self.execute_sql(&sql).await;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{
|
||||
@@ -9,8 +8,11 @@ use common_telemetry::{
|
||||
use servers::query_handler::SqlQueryHandler;
|
||||
use snafu::prelude::*;
|
||||
use sql::statements::statement::Statement;
|
||||
use table::requests::CreateDatabaseRequest;
|
||||
|
||||
use crate::error::{CatalogSnafu, ExecuteSqlSnafu, Result};
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu,
|
||||
};
|
||||
use crate::instance::Instance;
|
||||
use crate::metric;
|
||||
use crate::sql::SqlRequest;
|
||||
@@ -35,20 +37,35 @@ impl Instance {
|
||||
.context(ExecuteSqlSnafu)
|
||||
}
|
||||
Statement::Insert(i) => {
|
||||
let (catalog_name, schema_name, _table_name) =
|
||||
i.full_table_name().context(ParseSqlSnafu)?;
|
||||
|
||||
let schema_provider = self
|
||||
.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.expect("datafusion does not accept fallible catalog access")
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.expect("datafusion does not accept fallible catalog access")
|
||||
.unwrap();
|
||||
.catalog(&catalog_name)
|
||||
.context(CatalogSnafu)?
|
||||
.context(CatalogNotFoundSnafu { name: catalog_name })?
|
||||
.schema(&schema_name)
|
||||
.context(CatalogSnafu)?
|
||||
.context(SchemaNotFoundSnafu { name: schema_name })?;
|
||||
|
||||
let request = self.sql_handler.insert_to_request(schema_provider, *i)?;
|
||||
self.sql_handler.execute(request).await
|
||||
}
|
||||
|
||||
Statement::Create(c) => {
|
||||
Statement::CreateDatabase(c) => {
|
||||
let request = CreateDatabaseRequest {
|
||||
db_name: c.name.to_string(),
|
||||
};
|
||||
|
||||
info!("Creating a new database: {}", request.db_name);
|
||||
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::CreateDatabase(request))
|
||||
.await
|
||||
}
|
||||
|
||||
Statement::CreateTable(c) => {
|
||||
let table_id = self
|
||||
.catalog_manager
|
||||
.next_table_id()
|
||||
@@ -67,7 +84,9 @@ impl Instance {
|
||||
catalog_name, schema_name, table_name, table_id
|
||||
);
|
||||
|
||||
self.sql_handler.execute(SqlRequest::Create(request)).await
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::CreateTable(request))
|
||||
.await
|
||||
}
|
||||
Statement::Alter(alter_table) => {
|
||||
let req = self.sql_handler.alter_to_request(alter_table)?;
|
||||
|
||||
@@ -20,7 +20,7 @@ impl Instance {
|
||||
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
|
||||
let request = self.create_expr_to_request(expr).await;
|
||||
let result = futures::future::ready(request)
|
||||
.and_then(|request| self.sql_handler().execute(SqlRequest::Create(request)))
|
||||
.and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request)))
|
||||
.await;
|
||||
match result {
|
||||
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
|
||||
|
||||
@@ -22,7 +22,8 @@ mod show;
|
||||
#[derive(Debug)]
|
||||
pub enum SqlRequest {
|
||||
Insert(InsertRequest),
|
||||
Create(CreateTableRequest),
|
||||
CreateTable(CreateTableRequest),
|
||||
CreateDatabase(CreateDatabaseRequest),
|
||||
Alter(AlterTableRequest),
|
||||
ShowDatabases(ShowDatabases),
|
||||
ShowTables(ShowTables),
|
||||
@@ -45,7 +46,8 @@ impl SqlHandler {
|
||||
pub async fn execute(&self, request: SqlRequest) -> Result<Output> {
|
||||
match request {
|
||||
SqlRequest::Insert(req) => self.insert(req).await,
|
||||
SqlRequest::Create(req) => self.create(req).await,
|
||||
SqlRequest::CreateTable(req) => self.create_table(req).await,
|
||||
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
|
||||
SqlRequest::Alter(req) => self.alter(req).await,
|
||||
SqlRequest::ShowDatabases(stmt) => self.show_databases(stmt).await,
|
||||
SqlRequest::ShowTables(stmt) => self.show_tables(stmt).await,
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::RegisterTableRequest;
|
||||
use catalog::{RegisterSchemaRequest, RegisterTableRequest};
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing::info;
|
||||
use datatypes::schema::SchemaBuilder;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::TableConstraint;
|
||||
use sql::statements::create_table::CreateTable;
|
||||
use sql::statements::create::CreateTable;
|
||||
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
|
||||
use store_api::storage::consts::TIME_INDEX_NAME;
|
||||
use table::engine::EngineContext;
|
||||
@@ -16,12 +17,28 @@ use table::requests::*;
|
||||
|
||||
use crate::error::{
|
||||
self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu,
|
||||
InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, Result,
|
||||
InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu,
|
||||
Result,
|
||||
};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn create(&self, req: CreateTableRequest) -> Result<Output> {
|
||||
pub(crate) async fn create_database(&self, req: CreateDatabaseRequest) -> Result<Output> {
|
||||
let schema = req.db_name;
|
||||
let req = RegisterSchemaRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: schema.clone(),
|
||||
};
|
||||
self.catalog_manager
|
||||
.register_schema(req)
|
||||
.await
|
||||
.context(RegisterSchemaSnafu)?;
|
||||
|
||||
info!("Successfully created database: {:?}", schema);
|
||||
Ok(Output::AffectedRows(1))
|
||||
}
|
||||
|
||||
pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result<Output> {
|
||||
let ctx = EngineContext {};
|
||||
// determine catalog and schema from the very beginning
|
||||
let table_name = req.table_name.clone();
|
||||
@@ -52,7 +69,7 @@ impl SqlHandler {
|
||||
Ok(Output::AffectedRows(1))
|
||||
}
|
||||
|
||||
/// Converts [CreateTable] to [SqlRequest::Create].
|
||||
/// Converts [CreateTable] to [SqlRequest::CreateTable].
|
||||
pub(crate) fn create_to_request(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
@@ -181,7 +198,7 @@ mod tests {
|
||||
let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
|
||||
assert_eq!(1, res.len());
|
||||
match res.pop().unwrap() {
|
||||
Statement::Create(c) => c,
|
||||
Statement::CreateTable(c) => c,
|
||||
_ => {
|
||||
panic!("Unexpected statement!")
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use sql::statements::{self, insert::Insert};
|
||||
use table::requests::*;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
|
||||
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu,
|
||||
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::sql::{SqlHandler, SqlRequest};
|
||||
@@ -35,9 +35,8 @@ impl SqlHandler {
|
||||
) -> Result<SqlRequest> {
|
||||
let columns = stmt.columns();
|
||||
let values = stmt.values().context(ParseSqlValueSnafu)?;
|
||||
//TODO(dennis): table name may be in the form of `catalog.schema.table`,
|
||||
// but we don't process it right now.
|
||||
let table_name = stmt.table_name();
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
stmt.full_table_name().context(ParseSqlSnafu)?;
|
||||
|
||||
let table = schema_provider
|
||||
.table(&table_name)
|
||||
@@ -101,6 +100,8 @@ impl SqlHandler {
|
||||
}
|
||||
|
||||
Ok(SqlRequest::Insert(InsertRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
columns_values: columns_builders
|
||||
.into_iter()
|
||||
|
||||
@@ -9,6 +9,62 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use crate::instance::Instance;
|
||||
use crate::tests::test_util;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_create_database_and_insert_query() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (opts, _guard) =
|
||||
test_util::create_tmp_dir_and_datanode_opts("create_database_and_insert_query");
|
||||
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
|
||||
let output = instance.execute_sql("create database test").await.unwrap();
|
||||
assert!(matches!(output, Output::AffectedRows(1)));
|
||||
|
||||
let output = instance
|
||||
.execute_sql(
|
||||
r#"create table greptime.test.demo(
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
memory DOUBLE,
|
||||
ts bigint,
|
||||
TIME INDEX(ts)
|
||||
)"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(matches!(output, Output::AffectedRows(1)));
|
||||
|
||||
let output = instance
|
||||
.execute_sql(
|
||||
r#"insert into test.demo(host, cpu, memory, ts) values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000)
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(matches!(output, Output::AffectedRows(2)));
|
||||
|
||||
let query_output = instance
|
||||
.execute_sql("select ts from test.demo order by ts")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match query_output {
|
||||
Output::Stream(s) => {
|
||||
let batches = util::collect(s).await.unwrap();
|
||||
let columns = batches[0].df_recordbatch.columns();
|
||||
assert_eq!(1, columns.len());
|
||||
assert_eq!(
|
||||
&Int64Array::from_slice(&[1655276557000, 1655276558000]),
|
||||
columns[0].as_any().downcast_ref::<Int64Array>().unwrap()
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_execute_insert() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
@@ -22,7 +22,7 @@ use servers::query_handler::{
|
||||
};
|
||||
use snafu::prelude::*;
|
||||
use sql::ast::{ColumnDef, TableConstraint};
|
||||
use sql::statements::create_table::{CreateTable, TIME_INDEX};
|
||||
use sql::statements::create::{CreateTable, TIME_INDEX};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
|
||||
use sql::{dialect::GenericDialect, parser::ParserContext};
|
||||
@@ -105,7 +105,15 @@ impl SqlQueryHandler for Instance {
|
||||
.await
|
||||
.and_then(|object_result| object_result.try_into()),
|
||||
Statement::Insert(insert) => {
|
||||
let table_name = insert.table_name();
|
||||
// TODO(dennis): respect schema_name when inserting data
|
||||
let (_catalog_name, _schema_name, table_name) = insert
|
||||
.full_table_name()
|
||||
.context(error::ParseSqlSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteInsertSnafu {
|
||||
msg: "Failed to get table name",
|
||||
})?;
|
||||
|
||||
let expr = InsertExpr {
|
||||
table_name,
|
||||
expr: Some(insert_expr::Expr::Sql(query.to_string())),
|
||||
@@ -116,7 +124,7 @@ impl SqlQueryHandler for Instance {
|
||||
.await
|
||||
.and_then(|object_result| object_result.try_into())
|
||||
}
|
||||
Statement::Create(create) => {
|
||||
Statement::CreateTable(create) => {
|
||||
let expr = create_to_expr(create)
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })?;
|
||||
|
||||
@@ -133,6 +133,8 @@ fn partition_insert_request(
|
||||
}
|
||||
}
|
||||
|
||||
let catalog_name = &insert.catalog_name;
|
||||
let schema_name = &insert.schema_name;
|
||||
let table_name = &insert.table_name;
|
||||
dist_insert
|
||||
.into_iter()
|
||||
@@ -144,6 +146,8 @@ fn partition_insert_request(
|
||||
(
|
||||
region_id,
|
||||
InsertRequest {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
columns_values,
|
||||
},
|
||||
@@ -156,6 +160,7 @@ fn partition_insert_request(
|
||||
mod tests {
|
||||
use std::{collections::HashMap, result::Result, sync::Arc};
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::{
|
||||
data_type::ConcreteDataType,
|
||||
types::{BooleanType, StringType},
|
||||
@@ -373,6 +378,8 @@ mod tests {
|
||||
columns_values.insert("id".to_string(), builder.finish());
|
||||
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
columns_values,
|
||||
}
|
||||
@@ -398,6 +405,8 @@ mod tests {
|
||||
columns_values.insert("id".to_string(), builder.finish());
|
||||
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
columns_values,
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ pub(crate) fn insert_to_request(
|
||||
) -> Result<InsertRequest> {
|
||||
let columns = stmt.columns();
|
||||
let values = stmt.values().context(error::ParseSqlSnafu)?;
|
||||
let table_name = stmt.table_name();
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
stmt.full_table_name().context(error::ParseSqlSnafu)?;
|
||||
|
||||
let table = schema_provider
|
||||
.table(&table_name)
|
||||
@@ -80,6 +81,8 @@ pub(crate) fn insert_to_request(
|
||||
}
|
||||
|
||||
Ok(InsertRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
columns_values: columns_builders
|
||||
.into_iter()
|
||||
|
||||
@@ -130,6 +130,7 @@ mod tests {
|
||||
insert_expr::Expr,
|
||||
ColumnDataType, InsertExpr,
|
||||
};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder};
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
@@ -160,6 +161,8 @@ mod tests {
|
||||
columns_values.insert("id".to_string(), builder.finish());
|
||||
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
columns_values,
|
||||
}
|
||||
|
||||
@@ -49,7 +49,8 @@ where
|
||||
Statement::ShowTables(_)
|
||||
| Statement::ShowDatabases(_)
|
||||
| Statement::ShowCreateTable(_)
|
||||
| Statement::Create(_)
|
||||
| Statement::CreateTable(_)
|
||||
| Statement::CreateDatabase(_)
|
||||
| Statement::Alter(_)
|
||||
| Statement::Insert(_) => unreachable!(),
|
||||
}
|
||||
|
||||
@@ -117,6 +117,8 @@ impl ScriptsTable {
|
||||
|
||||
let _ = table
|
||||
.insert(InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: SCRIPTS_TABLE_NAME.to_string(),
|
||||
columns_values,
|
||||
})
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_grpc::writer::{to_ms_ts, Precision};
|
||||
use common_time::{timestamp::TimeUnit::Millisecond, Timestamp};
|
||||
use datatypes::{
|
||||
@@ -120,6 +121,9 @@ impl LineWriter {
|
||||
.map(|(column_name, (mut builder, _))| (column_name, builder.finish()))
|
||||
.collect();
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
// TODO(dennis): supports database
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: self.table_name,
|
||||
columns_values,
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ pub enum Error {
|
||||
Unsupported { sql: String, keyword: String },
|
||||
|
||||
#[snafu(display(
|
||||
"Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}",
|
||||
"Unexpected token while parsing SQL statement: {}, expected: '{}', found: {}, source: {}",
|
||||
sql,
|
||||
expected,
|
||||
actual,
|
||||
|
||||
@@ -234,7 +234,7 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn peek_token_as_string(&self) -> String {
|
||||
pub(crate) fn peek_token_as_string(&self) -> String {
|
||||
self.parser.peek_token().to_string()
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,9 @@ use table_engine::engine;
|
||||
use crate::ast::{ColumnDef, Ident, TableConstraint, Value as SqlValue};
|
||||
use crate::error::{self, InvalidTimeIndexSnafu, Result, SyntaxSnafu};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::create_table::{CreateTable, PartitionEntry, Partitions, TIME_INDEX};
|
||||
use crate::statements::create::{
|
||||
CreateDatabase, CreateTable, PartitionEntry, Partitions, TIME_INDEX,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::{sql_data_type_to_concrete_data_type, sql_value_to_value};
|
||||
|
||||
@@ -26,9 +28,37 @@ static THAN: Lazy<Token> = Lazy::new(|| Token::make_keyword("THAN"));
|
||||
/// Parses create [table] statement
|
||||
impl<'a> ParserContext<'a> {
|
||||
pub(crate) fn parse_create(&mut self) -> Result<Statement> {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::TABLE)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
match self.parser.peek_token() {
|
||||
Token::Word(w) => match w.keyword {
|
||||
Keyword::TABLE => self.parse_create_table(),
|
||||
|
||||
Keyword::DATABASE => self.parse_create_database(),
|
||||
|
||||
_ => self.unsupported(w.to_string()),
|
||||
},
|
||||
unexpected => self.unsupported(unexpected.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_create_database(&mut self) -> Result<Statement> {
|
||||
self.parser.next_token();
|
||||
|
||||
let database_name = self
|
||||
.parser
|
||||
.parse_object_name()
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "a database name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
Ok(Statement::CreateDatabase(CreateDatabase {
|
||||
name: database_name,
|
||||
}))
|
||||
}
|
||||
|
||||
fn parse_create_table(&mut self) -> Result<Statement> {
|
||||
self.parser.next_token();
|
||||
let if_not_exists =
|
||||
self.parser
|
||||
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
|
||||
@@ -36,7 +66,12 @@ impl<'a> ParserContext<'a> {
|
||||
let table_name = self
|
||||
.parser
|
||||
.parse_object_name()
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "a table name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let (columns, constraints) = self.parse_columns()?;
|
||||
|
||||
let partitions = self.parse_partitions()?;
|
||||
@@ -59,7 +94,7 @@ impl<'a> ParserContext<'a> {
|
||||
};
|
||||
validate_create(&create_table)?;
|
||||
|
||||
Ok(Statement::Create(create_table))
|
||||
Ok(Statement::CreateTable(create_table))
|
||||
}
|
||||
|
||||
// "PARTITION BY ..." syntax:
|
||||
@@ -70,7 +105,11 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
self.parser
|
||||
.expect_keywords(&[Keyword::BY, Keyword::RANGE, Keyword::COLUMNS])
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "BY, RANGE, COLUMNS",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let column_list = self
|
||||
.parser
|
||||
@@ -88,7 +127,11 @@ impl<'a> ParserContext<'a> {
|
||||
fn parse_partition_entry(&mut self) -> Result<PartitionEntry> {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::PARTITION)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "PARTITION",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let name = self
|
||||
.parser
|
||||
@@ -128,7 +171,11 @@ impl<'a> ParserContext<'a> {
|
||||
{
|
||||
self.parser
|
||||
.expect_token(&Token::LParen)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "(",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let mut values = vec![];
|
||||
while self.parser.peek_token() != Token::RParen {
|
||||
@@ -140,7 +187,12 @@ impl<'a> ParserContext<'a> {
|
||||
|
||||
self.parser
|
||||
.expect_token(&Token::RParen)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: ")",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
@@ -196,7 +248,11 @@ impl<'a> ParserContext<'a> {
|
||||
Token::Word(w) if w.keyword == Keyword::PRIMARY => {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::KEY)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "KEY",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
let columns = self
|
||||
.parser
|
||||
.parse_parenthesized_column_list(Mandatory)
|
||||
@@ -210,7 +266,12 @@ impl<'a> ParserContext<'a> {
|
||||
Token::Word(w) if w.keyword == Keyword::TIME => {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::INDEX)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "INDEX",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let columns = self
|
||||
.parser
|
||||
.parse_parenthesized_column_list(Mandatory)
|
||||
@@ -248,7 +309,11 @@ impl<'a> ParserContext<'a> {
|
||||
|
||||
self.parser
|
||||
.expect_token(&Token::Eq)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
.context(error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "=",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
match self.parser.next_token() {
|
||||
Token::Word(w) => Ok(w.value),
|
||||
@@ -420,6 +485,27 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_create_database() {
|
||||
let sql = "create database";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
|
||||
assert!(result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Unexpected token while parsing SQL statement"));
|
||||
|
||||
let sql = "create database prometheus";
|
||||
let stmts = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
|
||||
|
||||
assert_eq!(1, stmts.len());
|
||||
match &stmts[0] {
|
||||
Statement::CreateDatabase(c) => {
|
||||
assert_eq!(c.name.to_string(), "prometheus");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_create() {
|
||||
let sql = r"
|
||||
@@ -555,7 +641,7 @@ ENGINE=mito";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
|
||||
assert_eq!(result.len(), 1);
|
||||
match &result[0] {
|
||||
Statement::Create(c) => {
|
||||
Statement::CreateTable(c) => {
|
||||
assert!(c.partitions.is_some());
|
||||
|
||||
let partitions = c.partitions.as_ref().unwrap();
|
||||
@@ -670,7 +756,7 @@ ENGINE=mito";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
|
||||
assert_eq!(1, result.len());
|
||||
match &result[0] {
|
||||
Statement::Create(c) => {
|
||||
Statement::CreateTable(c) => {
|
||||
assert!(!c.if_not_exists);
|
||||
assert_eq!("demo", c.name.to_string());
|
||||
assert_eq!("mito", c.engine);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
pub mod alter;
|
||||
pub mod create_table;
|
||||
pub mod create;
|
||||
pub mod insert;
|
||||
pub mod query;
|
||||
pub mod show;
|
||||
@@ -32,6 +32,11 @@ pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, Strin
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table.value.clone(),
|
||||
)),
|
||||
[schema, table] => Ok((
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema.value.clone(),
|
||||
table.value.clone(),
|
||||
)),
|
||||
[catalog, schema, table] => Ok((
|
||||
catalog.value.clone(),
|
||||
schema.value.clone(),
|
||||
@@ -39,7 +44,7 @@ pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, Strin
|
||||
)),
|
||||
_ => error::InvalidSqlSnafu {
|
||||
msg: format!(
|
||||
"expect table name to be <catalog>.<schema>.<table> or <table>, actual: {}",
|
||||
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
|
||||
@@ -29,3 +29,8 @@ pub struct PartitionEntry {
|
||||
pub name: Ident,
|
||||
pub value_list: Vec<SqlValue>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct CreateDatabase {
|
||||
pub name: ObjectName,
|
||||
}
|
||||
@@ -3,6 +3,7 @@ use sqlparser::parser::ParserError;
|
||||
|
||||
use crate::ast::{Expr, Value};
|
||||
use crate::error::{self, Result};
|
||||
use crate::statements::table_idents_to_full_name;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Insert {
|
||||
@@ -11,12 +12,9 @@ pub struct Insert {
|
||||
}
|
||||
|
||||
impl Insert {
|
||||
pub fn table_name(&self) -> String {
|
||||
pub fn full_table_name(&self) -> Result<(String, String, String)> {
|
||||
match &self.inner {
|
||||
Statement::Insert { table_name, .. } => {
|
||||
// FIXME(dennis): table_name may be in the form of "catalog.schema.table"
|
||||
table_name.to_string()
|
||||
}
|
||||
Statement::Insert { table_name, .. } => table_idents_to_full_name(table_name),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use sqlparser::ast::Statement as SpStatement;
|
||||
use sqlparser::parser::ParserError;
|
||||
|
||||
use crate::statements::alter::AlterTable;
|
||||
use crate::statements::create_table::CreateTable;
|
||||
use crate::statements::create::{CreateDatabase, CreateTable};
|
||||
use crate::statements::insert::Insert;
|
||||
use crate::statements::query::Query;
|
||||
use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowTables};
|
||||
@@ -15,7 +15,9 @@ pub enum Statement {
|
||||
// Insert
|
||||
Insert(Box<Insert>),
|
||||
/// CREATE TABLE
|
||||
Create(CreateTable),
|
||||
CreateTable(CreateTable),
|
||||
// CREATE DATABASE
|
||||
CreateDatabase(CreateDatabase),
|
||||
/// ALTER TABLE
|
||||
Alter(AlterTable),
|
||||
// Databases.
|
||||
@@ -43,7 +45,9 @@ impl TryFrom<Statement> for SpStatement {
|
||||
)),
|
||||
Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))),
|
||||
Statement::Insert(i) => Ok(i.inner),
|
||||
Statement::Create(_) | Statement::Alter(_) => unimplemented!(),
|
||||
Statement::CreateDatabase(_) | Statement::CreateTable(_) | Statement::Alter(_) => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ arc-swap = "1.0"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
common-catalog = { path = "../common/catalog" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
|
||||
@@ -443,12 +443,12 @@ mod tests {
|
||||
use storage::EngineImpl;
|
||||
use store_api::manifest::Manifest;
|
||||
use store_api::storage::ReadContext;
|
||||
use table::requests::{AddColumnRequest, AlterKind, InsertRequest};
|
||||
use table::requests::{AddColumnRequest, AlterKind};
|
||||
use tempdir::TempDir;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util;
|
||||
use crate::table::test_util::{MockRegion, TABLE_NAME};
|
||||
use crate::table::test_util::{new_insert_request, MockRegion, TABLE_NAME};
|
||||
|
||||
async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) {
|
||||
let table_name = "test_default_constraint";
|
||||
@@ -518,10 +518,7 @@ mod tests {
|
||||
columns_values.insert("name".to_string(), Arc::new(names.clone()));
|
||||
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
|
||||
|
||||
let insert_req = InsertRequest {
|
||||
table_name: table_name.to_string(),
|
||||
columns_values,
|
||||
};
|
||||
let insert_req = new_insert_request(table_name.to_string(), columns_values);
|
||||
assert_eq!(2, table.insert(insert_req).await.unwrap());
|
||||
|
||||
let stream = table.scan(&None, &[], None).await.unwrap();
|
||||
@@ -557,10 +554,7 @@ mod tests {
|
||||
columns_values.insert("n".to_string(), Arc::new(nums.clone()));
|
||||
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
|
||||
|
||||
let insert_req = InsertRequest {
|
||||
table_name: table_name.to_string(),
|
||||
columns_values,
|
||||
};
|
||||
let insert_req = new_insert_request(table_name.to_string(), columns_values);
|
||||
assert_eq!(2, table.insert(insert_req).await.unwrap());
|
||||
|
||||
let stream = table.scan(&None, &[], None).await.unwrap();
|
||||
@@ -601,10 +595,7 @@ mod tests {
|
||||
assert_eq!(TableType::Base, table.table_type());
|
||||
assert_eq!(schema, table.schema());
|
||||
|
||||
let insert_req = InsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
columns_values: HashMap::default(),
|
||||
};
|
||||
let insert_req = new_insert_request("demo".to_string(), HashMap::default());
|
||||
assert_eq!(0, table.insert(insert_req).await.unwrap());
|
||||
|
||||
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
|
||||
@@ -618,10 +609,7 @@ mod tests {
|
||||
columns_values.insert("memory".to_string(), Arc::new(memories.clone()));
|
||||
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
|
||||
|
||||
let insert_req = InsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
columns_values,
|
||||
};
|
||||
let insert_req = new_insert_request("demo".to_string(), columns_values);
|
||||
assert_eq!(2, table.insert(insert_req).await.unwrap());
|
||||
|
||||
let stream = table.scan(&None, &[], None).await.unwrap();
|
||||
@@ -710,10 +698,7 @@ mod tests {
|
||||
columns_values.insert("memory".to_string(), Arc::new(memories));
|
||||
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
|
||||
|
||||
let insert_req = InsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
columns_values,
|
||||
};
|
||||
let insert_req = new_insert_request("demo".to_string(), columns_values);
|
||||
assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap());
|
||||
|
||||
let stream = table.scan(&None, &[], None).await.unwrap();
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
mod mock_engine;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::{services::fs::Builder, ObjectStore};
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
@@ -13,6 +14,7 @@ use table::engine::EngineContext;
|
||||
use table::engine::TableEngine;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::CreateTableRequest;
|
||||
use table::requests::InsertRequest;
|
||||
use table::TableRef;
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -24,6 +26,19 @@ pub use crate::table::test_util::mock_engine::MockRegion;
|
||||
|
||||
pub const TABLE_NAME: &str = "demo";
|
||||
|
||||
/// Create a InsertRequest with default catalog and schema.
|
||||
pub fn new_insert_request(
|
||||
table_name: String,
|
||||
columns_values: HashMap<String, VectorRef>,
|
||||
) -> InsertRequest {
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name,
|
||||
columns_values,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schema_for_test() -> Schema {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
|
||||
@@ -9,10 +9,17 @@ use crate::metadata::TableId;
|
||||
/// Insert request
|
||||
#[derive(Debug)]
|
||||
pub struct InsertRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub columns_values: HashMap<String, VectorRef>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateDatabaseRequest {
|
||||
pub db_name: String,
|
||||
}
|
||||
|
||||
/// Create table request
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateTableRequest {
|
||||
|
||||
Reference in New Issue
Block a user