feat: support to create external table (#1372)

* feat: support to create external table

* chore: apply suggestions from CR

* test: add create external table without ts type

* chore: apply suggestions from CR

* fix: fix import typo

* refactor: move consts to table crate

* chore: apply suggestions from CR

* refactor: rename create_table_schema
This commit is contained in:
Weny Xu
2023-04-24 15:43:12 +09:00
committed by GitHub
parent 17daf4cdff
commit f2167663b2
26 changed files with 527 additions and 72 deletions

5
Cargo.lock generated
View File

@@ -2469,6 +2469,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"file-table-engine",
"futures",
"futures-util",
"humantime-serde",
@@ -3025,6 +3026,7 @@ dependencies = [
"datafusion-expr",
"datanode",
"datatypes",
"file-table-engine",
"futures",
"futures-util",
"itertools",
@@ -6475,6 +6477,7 @@ dependencies = [
"chrono",
"common-base",
"common-catalog",
"common-datasource",
"common-error",
"common-function",
"common-function-macro",
@@ -6496,11 +6499,13 @@ dependencies = [
"metrics",
"num",
"num-traits",
"object-store",
"once_cell",
"paste",
"promql",
"promql-parser",
"rand",
"regex",
"serde",
"serde_json",
"session",

View File

@@ -26,13 +26,14 @@ use std::sync::Arc;
use std::task::Poll;
use arrow::record_batch::RecordBatch;
use arrow_schema::{ArrowError, Schema};
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::file_format::FileOpenFuture;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use self::csv::CsvFormat;
use self::json::JsonFormat;
@@ -73,7 +74,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
#[async_trait]
pub trait FileFormat: Send + Sync + std::fmt::Debug {
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema>;
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<ArrowSchema>;
}
pub trait ArrowDecoder: Send + 'static {
@@ -154,3 +155,15 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
Ok(stream.boxed())
}))
}
pub async fn infer_schemas(
store: &ObjectStore,
files: &[String],
file_format: &dyn FileFormat,
) -> Result<ArrowSchema> {
let mut schemas = Vec::with_capacity(files.len());
for file in files {
schemas.push(file_format.infer_schema(store, file.to_string()).await?)
}
ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
}

View File

@@ -92,7 +92,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
}
}
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<RawSchema> {
pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {
let column_schemas = expr
.column_defs
.iter()
@@ -101,14 +101,17 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result<RawSchema> {
})
.collect::<Result<Vec<ColumnSchema>>>()?;
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
MissingTimestampColumnSnafu {
msg: format!("CreateExpr: {expr:?}")
}
);
// allow external table schema without the time index
if require_time_index {
ensure!(
column_schemas
.iter()
.any(|column| column.name == expr.time_index),
MissingTimestampColumnSnafu {
msg: format!("CreateExpr: {expr:?}")
}
);
}
let column_schemas = column_schemas
.into_iter()
@@ -127,8 +130,9 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result<RawSchema> {
pub fn create_expr_to_request(
table_id: TableId,
expr: CreateTableExpr,
require_time_index: bool,
) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let schema = create_table_schema(&expr, require_time_index)?;
let primary_key_indices = expr
.primary_keys
.iter()

View File

@@ -30,6 +30,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../datatypes" }
file-table-engine = { path = "../file-table-engine" }
futures = "0.3"
futures-util.workspace = true
hyper = { version = "0.14", features = ["full"] }

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use common_error::prelude::*;
use common_procedure::ProcedureId;
use serde_json::error::Error as JsonError;
use snafu::Location;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
@@ -305,6 +306,18 @@ pub enum Error {
source: common_time::error::Error,
},
#[snafu(display("Failed to infer schema: {}", source))]
InferSchema {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to prepare immutable table: {}", source))]
PrepareImmutableTable {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
Catalog {
#[snafu(backtrace)]
@@ -314,6 +327,7 @@ pub enum Error {
#[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))]
FindTable {
table_name: String,
#[snafu(backtrace)]
source: catalog::error::Error,
},
@@ -424,6 +438,12 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to encode object into json, source: {}", source))]
EncodeJson {
location: Location,
source: JsonError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -464,6 +484,8 @@ impl ErrorExt for Error {
ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(),
InferSchema { source, .. } => source.status_code(),
ColumnValuesNumberMismatch { .. }
| InvalidSql { .. }
| NotSupportSql { .. }
@@ -479,7 +501,10 @@ impl ErrorExt for Error {
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
| ColumnNoneDefaultValue { .. }
| PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } => StatusCode::Unexpected,
// TODO(yingwen): Further categorize http error.
StartServer { .. }

View File

@@ -27,6 +27,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::info;
use file_table_engine::engine::immutable::ImmutableFileTableEngine;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use meta_client::client::{MetaClient, MetaClientBuilder};
@@ -109,7 +110,7 @@ impl Instance {
let object_store = new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.wal).await?);
let table_engine = Arc::new(DefaultEngine::new(
let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::from(opts),
@@ -117,19 +118,30 @@ impl Instance {
object_store.clone(),
compaction_scheduler,
),
object_store,
object_store.clone(),
));
let mut engine_procedures = HashMap::with_capacity(2);
engine_procedures.insert(
table_engine.name().to_string(),
table_engine.clone() as TableEngineProcedureRef,
mito_engine.name().to_string(),
mito_engine.clone() as TableEngineProcedureRef,
);
// TODO(yingwen): Insert the file table engine into `engine_procedures`
// once #1372 is ready.
let immutable_file_engine = Arc::new(ImmutableFileTableEngine::new(
file_table_engine::config::EngineConfig::default(),
object_store.clone(),
));
engine_procedures.insert(
immutable_file_engine.name().to_string(),
immutable_file_engine.clone() as TableEngineProcedureRef,
);
let engine_manager = Arc::new(
MemoryTableEngineManager::new(table_engine.clone())
.with_engine_procedures(engine_procedures),
MemoryTableEngineManager::with(vec![
mito_engine.clone(),
immutable_file_engine.clone(),
])
.with_engine_procedures(engine_procedures),
);
// create remote catalog manager
@@ -198,12 +210,13 @@ impl Instance {
// Register all procedures.
if let Some(procedure_manager) = &procedure_manager {
// Register procedures of the mito engine.
table_engine.register_procedure_loaders(&**procedure_manager);
mito_engine.register_procedure_loaders(&**procedure_manager);
immutable_file_engine.register_procedure_loaders(&**procedure_manager);
// Register procedures in table-procedure crate.
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
table_engine.clone(),
table_engine.clone(),
mito_engine.clone(),
mito_engine.clone(),
&**procedure_manager,
);
// TODO(yingwen): Register procedures of the file table engine once #1372

View File

@@ -83,6 +83,27 @@ impl Instance {
.execute(SqlRequest::CreateTable(request), query_ctx)
.await
}
Statement::CreateExternalTable(create_external_table) => {
let table_id = self
.table_id_provider
.as_ref()
.context(TableIdProviderNotFoundSnafu)?
.next_table_id()
.await
.context(BumpTableIdSnafu)?;
let name = create_external_table.name.clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self
.sql_handler
.create_external_to_request(table_id, create_external_table, &table_ref)
.await?;
let table_id = request.id;
info!("Creating external table: {table_ref}, table id = {table_id}",);
self.sql_handler
.execute(SqlRequest::CreateTable(request), query_ctx)
.await
}
Statement::Alter(alter_table) => {
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr};
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::info;
@@ -58,7 +59,9 @@ impl Instance {
table_id
};
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu)?;
let require_time_index = expr.engine != IMMUTABLE_FILE_ENGINE;
let request = create_expr_to_request(table_id, expr, require_time_index)
.context(CreateExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::CreateTable(request), QueryContext::arc())
@@ -118,7 +121,7 @@ mod tests {
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr).unwrap();
let request = create_expr_to_request(1024, expr, true).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
@@ -130,7 +133,7 @@ mod tests {
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr);
let result = create_expr_to_request(1025, expr, true);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
@@ -142,11 +145,11 @@ mod tests {
#[test]
fn test_create_table_schema() {
let mut expr = testing_create_expr();
let schema = create_table_schema(&expr).unwrap();
let schema = create_table_schema(&expr, true).unwrap();
assert_eq!(schema, expected_table_schema());
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
let result = create_table_schema(&expr, true);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),

View File

@@ -32,6 +32,7 @@ use crate::instance::sql::table_idents_to_full_name;
mod alter;
mod create;
mod create_external;
mod drop_table;
mod flush_table;
pub(crate) mod insert;

View File

@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use file_table_engine::table::immutable::ImmutableFileTableOptions;
use query::sql::prepare_immutable_file_table_files_and_schema;
use snafu::ResultExt;
use sql::statements::create::CreateExternalTable;
use table::engine::TableReference;
use table::metadata::TableId;
use table::requests::{CreateTableRequest, TableOptions, IMMUTABLE_TABLE_META_KEY};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn create_external_to_request(
&self,
table_id: TableId,
stmt: CreateExternalTable,
table_ref: &TableReference<'_>,
) -> Result<CreateTableRequest> {
let mut options = stmt.options;
let (files, schema) =
prepare_immutable_file_table_files_and_schema(&options, &stmt.columns)
.await
.context(error::PrepareImmutableTableSnafu)?;
let meta = ImmutableFileTableOptions { files };
options.insert(
IMMUTABLE_TABLE_META_KEY.to_string(),
serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?,
);
Ok(CreateTableRequest {
id: table_id,
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
desc: None,
schema,
region_numbers: vec![0],
primary_key_indices: vec![0],
create_if_not_exists: stmt.if_not_exists,
table_options: TableOptions::try_from(&options)
.context(error::UnrecognizedTableOptionSnafu)?,
engine: stmt.engine,
})
}
}

View File

@@ -28,19 +28,14 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::error::{self as table_error, Result as TableResult};
use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType};
use table::Table;
use table::{requests, Table};
use super::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig};
use crate::error::{self, ConvertRawSnafu, Result};
use crate::manifest::immutable::{
read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION,
};
use crate::manifest::table_manifest_dir;
pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META";
pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION";
pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN";
pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT";
use crate::table::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig};
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
@@ -125,19 +120,18 @@ impl ImmutableFileTable {
let table_info = Arc::new(table_info);
let options = &table_info.meta.options.extra_options;
let url = options.get(IMMUTABLE_TABLE_LOCATION_KEY).context(
let url = options
.get(requests::IMMUTABLE_TABLE_LOCATION_KEY)
.context(error::MissingRequiredFieldSnafu {
name: requests::IMMUTABLE_TABLE_LOCATION_KEY,
})?;
let meta = options.get(requests::IMMUTABLE_TABLE_META_KEY).context(
error::MissingRequiredFieldSnafu {
name: IMMUTABLE_TABLE_LOCATION_KEY,
name: requests::IMMUTABLE_TABLE_META_KEY,
},
)?;
let meta =
options
.get(IMMUTABLE_TABLE_META_KEY)
.context(error::MissingRequiredFieldSnafu {
name: IMMUTABLE_TABLE_META_KEY,
})?;
let meta: ImmutableFileTableOptions =
serde_json::from_str(meta).context(error::DecodeJsonSnafu)?;
let format = Format::try_from(options).context(error::ParseFileFormatSnafu)?;

View File

@@ -22,13 +22,13 @@ use object_store::services::Fs;
use object_store::ObjectStore;
use table::engine::{table_dir, EngineContext, TableEngine};
use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{CreateTableRequest, TableOptions};
use table::requests::{self, CreateTableRequest, TableOptions};
use table::TableRef;
use crate::config::EngineConfig;
use crate::engine::immutable::ImmutableFileTableEngine;
use crate::manifest::immutable::ImmutableMetadata;
use crate::table::immutable::{self, ImmutableFileTableOptions};
use crate::table::immutable::ImmutableFileTableOptions;
pub const TEST_TABLE_NAME: &str = "demo";
@@ -98,15 +98,15 @@ pub struct TestEngineComponents {
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
let mut table_options = TableOptions::default();
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_LOCATION_KEY.to_string(),
requests::IMMUTABLE_TABLE_LOCATION_KEY.to_string(),
"mock_path".to_string(),
);
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_META_KEY.to_string(),
requests::IMMUTABLE_TABLE_META_KEY.to_string(),
serde_json::to_string(&ImmutableFileTableOptions::default()).unwrap(),
);
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_FORMAT_KEY.to_string(),
requests::IMMUTABLE_TABLE_FORMAT_KEY.to_string(),
"csv".to_string(),
);

View File

@@ -32,6 +32,7 @@ datafusion-common.workspace = true
datafusion-expr.workspace = true
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
file-table-engine = { path = "../file-table-engine" }
futures = "0.3"
futures-util.workspace = true
itertools = "0.10"

View File

@@ -484,6 +484,18 @@ pub enum Error {
file_schema: String,
location: Location,
},
#[snafu(display("Failed to encode object into json, source: {}", source))]
EncodeJson {
source: serde_json::error::Error,
location: Location,
},
#[snafu(display("Failed to prepare immutable table: {}", source))]
PrepareImmutableTable {
#[snafu(backtrace)]
source: query::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -504,7 +516,8 @@ impl ErrorExt for Error {
| Error::MissingMetasrvOpts { .. }
| Error::ColumnNoneDefaultValue { .. }
| Error::BuildRegex { .. }
| Error::InvalidSchema { .. } => StatusCode::InvalidArguments,
| Error::InvalidSchema { .. }
| Error::PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
Error::NotSupported { .. } => StatusCode::Unsupported,
@@ -541,7 +554,8 @@ impl ErrorExt for Error {
Error::IllegalFrontendState { .. }
| Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. } => StatusCode::Unexpected,
| Error::ContextValueNotFound { .. }
| Error::EncodeJson { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,

View File

@@ -20,13 +20,15 @@ use api::v1::{Column, ColumnDataType, CreateTableExpr};
use common_error::prelude::BoxedError;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::schema::ColumnSchema;
use file_table_engine::table::immutable::ImmutableFileTableOptions;
use query::sql::prepare_immutable_file_table_files_and_schema;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::column_def_to_schema;
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::util::to_lowercase_options_map;
use table::requests::TableOptions;
use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY};
use crate::error::{
self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu,
@@ -76,6 +78,44 @@ impl CreateExprFactory for DefaultCreateExprFactory {
}
}
pub(crate) async fn create_external_expr(
create: CreateExternalTable,
query_ctx: QueryContextRef,
) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let mut options = create.options;
let (files, schema) = prepare_immutable_file_table_files_and_schema(&options, &create.columns)
.await
.context(error::PrepareImmutableTableSnafu)?;
let meta = ImmutableFileTableOptions { files };
options.insert(
IMMUTABLE_TABLE_META_KEY.to_string(),
serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?,
);
let expr = CreateTableExpr {
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
column_defs: column_schemas_to_defs(schema.column_schemas)?,
time_index: "".to_string(),
primary_keys: vec![],
create_if_not_exists: create.if_not_exists,
table_options: options,
table_id: None,
region_ids: vec![],
engine: create.engine.to_string(),
};
Ok(expr)
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
pub(crate) fn create_to_expr(
create: &CreateTable,

View File

@@ -334,6 +334,11 @@ impl DistInstance {
let _ = self.create_table(create_expr, stmt.partitions).await?;
Ok(Output::AffectedRows(0))
}
Statement::CreateExternalTable(stmt) => {
let create_expr = &mut expr_factory::create_external_expr(stmt, query_ctx).await?;
self.create_table(create_expr, None).await?;
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => {
let expr = grpc::to_alter_expr(alter_table, query_ctx)?;
self.handle_alter_table(expr).await
@@ -673,7 +678,7 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
schema: raw_schema,
primary_key_indices,
value_indices: vec![],
engine: "mito".to_string(),
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),

View File

@@ -77,6 +77,10 @@ impl StatementExecutor {
let mut rows_inserted = 0;
for entry in entries.iter() {
let path = entry.path();
// skips directories.
if entry.path().ends_with('/') {
continue;
}
let reader = object_store
.reader(path)
.await

View File

@@ -479,6 +479,39 @@ async fn test_execute_create(instance: Arc<dyn MockInstance>) {
assert!(matches!(output, Output::AffectedRows(0)));
}
#[apply(both_instances_cases)]
async fn test_execute_external_create(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(
&instance,
r#"create external table test_table(
host string,
ts timestamp,
cpu double default 0,
memory double
) with (location='/tmp/', format='csv');"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
}
#[apply(both_instances_cases)]
async fn test_execute_external_create_without_ts_type(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(
&instance,
r#"create external table test_table(
host string,
cpu double default 0,
memory double
) with (location='/tmp/', format='csv');"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
}
#[apply(standalone_instance_case)]
async fn test_rename_table(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

View File

@@ -12,6 +12,7 @@ catalog = { path = "../catalog" }
chrono.workspace = true
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-datasource = { path = "../common/datasource" }
common-error = { path = "../common/error" }
common-function = { path = "../common/function" }
common-query = { path = "../common/query" }
@@ -29,9 +30,11 @@ futures = "0.3"
futures-util.workspace = true
humantime = "2.1"
metrics.workspace = true
object-store = { path = "../object-store" }
once_cell = "1.10"
promql = { path = "../promql" }
promql-parser = "0.1.0"
regex = "1.6"
serde.workspace = true
serde_json = "1.0"
session = { path = "../session" }

View File

@@ -126,6 +126,54 @@ pub enum Error {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
ParseSql {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Missing required field: {}", name))]
MissingRequiredField { name: String, location: Location },
#[snafu(display("Failed to regex, source: {}", source))]
BuildRegex {
location: Location,
source: regex::Error,
},
#[snafu(display("Failed to build data source backend, source: {}", source))]
BuildBackend {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Failed to list objects, source: {}", source))]
ListObjects {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Unsupported file format: {}", format))]
UnsupportedFileFormat { format: String, location: Location },
#[snafu(display("Failed to parse file format: {}", source))]
ParseFileFormat {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Failed to infer schema: {}", source))]
InferSchema {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Failed to convert datafusion schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
impl ErrorExt for Error {
@@ -139,12 +187,22 @@ impl ErrorExt for Error {
| SchemaNotFound { .. }
| TableNotFound { .. }
| ParseTimestamp { .. }
| ParseFloat { .. } => StatusCode::InvalidArguments,
| ParseFloat { .. }
| MissingRequiredField { .. }
| BuildRegex { .. }
| UnsupportedFileFormat { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(),
QueryAccessDenied { .. } => StatusCode::AccessDenied,
Catalog { source } => source.status_code(),
VectorComputation { source } | ConvertDatafusionSchema { source } => {
source.status_code()
}
ParseSql { source } => source.status_code(),
CreateRecordBatch { source } => source.status_code(),
QueryExecution { source } | QueryPlan { source } => source.status_code(),
DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal,

View File

@@ -14,20 +14,35 @@
mod show;
use std::collections::HashMap;
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::json::JsonFormat;
use common_datasource::file_format::parquet::ParquetFormat;
use common_datasource::file_format::{infer_schemas, FileFormat};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::vectors::{Helper, StringVector};
use object_store::ObjectStore;
use once_cell::sync::Lazy;
use regex::Regex;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::ColumnDef;
use sql::statements::column_def_to_schema;
use sql::statements::create::Partitions;
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use table::requests::{
IMMUTABLE_TABLE_FORMAT_KEY, IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY,
};
use table::TableRef;
use crate::error::{self, Result};
@@ -250,6 +265,105 @@ fn describe_column_semantic_types(
))
}
pub async fn prepare_immutable_file_table_files_and_schema(
options: &HashMap<String, String>,
columns: &Vec<ColumnDef>,
) -> Result<(Vec<String>, RawSchema)> {
let (object_store, files) = prepare_immutable_file_table(options).await?;
let schema = if !columns.is_empty() {
let columns_schemas: Vec<_> = columns
.iter()
.map(|column| column_def_to_schema(column, false).context(error::ParseSqlSnafu))
.collect::<Result<Vec<_>>>()?;
RawSchema::new(columns_schemas)
} else {
let format = parse_immutable_file_table_format(options)?;
infer_immutable_file_table_schema(&object_store, &*format, &files).await?
};
Ok((files, schema))
}
// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
async fn prepare_immutable_file_table(
options: &HashMap<String, String>,
) -> Result<(ObjectStore, Vec<String>)> {
let url =
options
.get(IMMUTABLE_TABLE_LOCATION_KEY)
.context(error::MissingRequiredFieldSnafu {
name: IMMUTABLE_TABLE_LOCATION_KEY,
})?;
let (dir, filename) = find_dir_and_filename(url);
let source = if let Some(filename) = filename {
Source::Filename(filename)
} else {
Source::Dir
};
let regex = options
.get(IMMUTABLE_TABLE_PATTERN_KEY)
.map(|x| Regex::new(x))
.transpose()
.context(error::BuildRegexSnafu)?;
let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
let lister = Lister::new(object_store.clone(), source, dir, regex);
// If we scan files in a directory every time the database restarts,
// then it might lead to a potential undefined behavior:
// If a user adds a file with an incompatible schema to that directory,
// it will make the external table unavailable.
let files = lister
.list()
.await
.context(error::ListObjectsSnafu)?
.into_iter()
.filter_map(|entry| {
if entry.path().ends_with('/') {
None
} else {
Some(entry.path().to_string())
}
})
.collect::<Vec<_>>();
Ok((object_store, files))
}
fn parse_immutable_file_table_format(
options: &HashMap<String, String>,
) -> Result<Box<dyn FileFormat>> {
let format = options
.get(IMMUTABLE_TABLE_FORMAT_KEY)
.cloned()
.unwrap_or_default()
.to_uppercase();
match format.as_str() {
"CSV" => {
let file_format = CsvFormat::try_from(options).context(error::ParseFileFormatSnafu)?;
Ok(Box::new(file_format))
}
"JSON" => {
let file_format = JsonFormat::try_from(options).context(error::ParseFileFormatSnafu)?;
Ok(Box::new(file_format))
}
"PARQUET" => Ok(Box::new(ParquetFormat {})),
format => error::UnsupportedFileFormatSnafu { format }.fail(),
}
}
async fn infer_immutable_file_table_schema(
object_store: &ObjectStore,
file_format: &dyn FileFormat,
files: &[String],
) -> Result<RawSchema> {
let merged = infer_schemas(object_store, files, file_format)
.await
.context(error::InferSchemaSnafu)?;
Ok(RawSchema::from(
&Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
))
}
#[cfg(test)]
mod test {
use std::sync::Arc;

View File

@@ -209,6 +209,8 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result<Column> {
Ok(ColumnType::MYSQL_TYPE_VARCHAR)
}
ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
_ => error::InternalSnafu {
err_msg: format!(
"not implemented for column datatype {:?}",

View File

@@ -15,7 +15,6 @@
use std::cmp::Ordering;
use itertools::Itertools;
use mito::engine;
use once_cell::sync::Lazy;
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Value};
@@ -66,7 +65,9 @@ impl<'a> ParserContext<'a> {
self.parser
.expect_keyword(Keyword::TABLE)
.context(error::SyntaxSnafu { sql: self.sql })?;
let if_not_exists =
self.parser
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self
.parser
.parse_object_name()
@@ -75,9 +76,8 @@ impl<'a> ParserContext<'a> {
expected: "a table name",
actual: self.peek_token_as_string(),
})?;
let (columns, constraints) = self.parse_columns()?;
let engine = self.parse_table_engine(common_catalog::consts::IMMUTABLE_FILE_ENGINE)?;
let options = self
.parser
.parse_options(Keyword::WITH)
@@ -97,6 +97,8 @@ impl<'a> ParserContext<'a> {
columns,
constraints,
options,
if_not_exists,
engine,
}))
}
@@ -141,7 +143,7 @@ impl<'a> ParserContext<'a> {
let partitions = self.parse_partitions()?;
let engine = self.parse_table_engine()?;
let engine = self.parse_table_engine(common_catalog::consts::MITO_ENGINE)?;
let options = self
.parser
.parse_options(Keyword::WITH)
@@ -552,9 +554,9 @@ impl<'a> ParserContext<'a> {
}
/// Parses the set of valid formats
fn parse_table_engine(&mut self) -> Result<String> {
fn parse_table_engine(&mut self, default: &str) -> Result<String> {
if !self.consume_token(ENGINE) {
return Ok(engine::MITO_ENGINE.to_string());
return Ok(default.to_string());
}
self.parser
@@ -780,6 +782,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use sqlparser::ast::ColumnOption::NotNull;
use sqlparser::dialect::GenericDialect;
@@ -791,16 +794,32 @@ mod tests {
sql: &'a str,
expected_table_name: &'a str,
expected_options: HashMap<String, String>,
expected_engine: &'a str,
expected_if_not_exist: bool,
}
let tests = [Test {
sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
expected_table_name: "city",
expected_options: HashMap::from([
("LOCATION".to_string(), "/var/data/city.csv".to_string()),
("FORMAT".to_string(), "csv".to_string()),
]),
}];
let tests = [
Test {
sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
expected_table_name: "city",
expected_options: HashMap::from([
("LOCATION".to_string(), "/var/data/city.csv".to_string()),
("FORMAT".to_string(), "csv".to_string()),
]),
expected_engine: IMMUTABLE_FILE_ENGINE,
expected_if_not_exist: false,
},
Test {
sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
expected_table_name: "city",
expected_options: HashMap::from([
("LOCATION".to_string(), "/var/data/city.csv".to_string()),
("FORMAT".to_string(), "csv".to_string()),
]),
expected_engine: "foo",
expected_if_not_exist: true,
},
];
for test in tests {
let stmts = ParserContext::create_with_dialect(test.sql, &GenericDialect {}).unwrap();
@@ -809,6 +828,8 @@ mod tests {
Statement::CreateExternalTable(c) => {
assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
assert_eq!(c.options, test.expected_options);
assert_eq!(c.if_not_exists, test.expected_if_not_exist);
assert_eq!(c.engine, test.expected_engine);
}
_ => unreachable!(),
}

View File

@@ -195,6 +195,8 @@ pub struct CreateExternalTable {
/// All keys are uppercase.
/// TODO(weny): unify the key's case styling.
pub options: HashMap<String, String>,
pub if_not_exists: bool,
pub engine: String,
}
#[cfg(test)]

View File

@@ -72,6 +72,18 @@ impl MemoryTableEngineManager {
self.engine_procedures = RwLock::new(engine_procedures);
self
}
pub fn with(engines: Vec<TableEngineRef>) -> Self {
let engines = engines
.into_iter()
.map(|engine| (engine.name().to_string(), engine))
.collect::<HashMap<_, _>>();
let engines = RwLock::new(engines);
MemoryTableEngineManager {
engines,
engine_procedures: RwLock::new(HashMap::new()),
}
}
}
#[async_trait]

View File

@@ -29,6 +29,11 @@ use crate::error;
use crate::error::ParseTableOptionSnafu;
use crate::metadata::TableId;
pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META";
pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION";
pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN";
pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT";
#[derive(Debug, Clone)]
pub struct CreateDatabaseRequest {
pub db_name: String,