diff --git a/Cargo.lock b/Cargo.lock index e95d78654b..6211ae3442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2469,6 +2469,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "file-table-engine", "futures", "futures-util", "humantime-serde", @@ -3025,6 +3026,7 @@ dependencies = [ "datafusion-expr", "datanode", "datatypes", + "file-table-engine", "futures", "futures-util", "itertools", @@ -6475,6 +6477,7 @@ dependencies = [ "chrono", "common-base", "common-catalog", + "common-datasource", "common-error", "common-function", "common-function-macro", @@ -6496,11 +6499,13 @@ dependencies = [ "metrics", "num", "num-traits", + "object-store", "once_cell", "paste", "promql", "promql-parser", "rand", + "regex", "serde", "serde_json", "session", diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 9f53f1c210..d210ca4410 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -26,13 +26,14 @@ use std::sync::Arc; use std::task::Poll; use arrow::record_batch::RecordBatch; -use arrow_schema::{ArrowError, Schema}; +use arrow_schema::{ArrowError, Schema as ArrowSchema}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::file_format::FileOpenFuture; use futures::StreamExt; use object_store::ObjectStore; +use snafu::ResultExt; use self::csv::CsvFormat; use self::json::JsonFormat; @@ -73,7 +74,7 @@ impl TryFrom<&HashMap> for Format { #[async_trait] pub trait FileFormat: Send + Sync + std::fmt::Debug { - async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result; + async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result; } pub trait ArrowDecoder: Send + 'static { @@ -154,3 +155,15 @@ pub fn open_with_decoder DataFusionResult>( Ok(stream.boxed()) })) } + +pub async fn infer_schemas( + store: &ObjectStore, + files: &[String], + file_format: &dyn FileFormat, +) -> Result { + let mut schemas = Vec::with_capacity(files.len()); + for file in files { + schemas.push(file_format.infer_schema(store, file.to_string()).await?) + } + ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu) +} diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index c20f47dcfc..ba570e10d9 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -92,7 +92,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { } } -pub fn create_table_schema(expr: &CreateTableExpr) -> Result { +pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result { let column_schemas = expr .column_defs .iter() @@ -101,14 +101,17 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result { }) .collect::>>()?; - ensure!( - column_schemas - .iter() - .any(|column| column.name == expr.time_index), - MissingTimestampColumnSnafu { - msg: format!("CreateExpr: {expr:?}") - } - ); + // allow external table schema without the time index + if require_time_index { + ensure!( + column_schemas + .iter() + .any(|column| column.name == expr.time_index), + MissingTimestampColumnSnafu { + msg: format!("CreateExpr: {expr:?}") + } + ); + } let column_schemas = column_schemas .into_iter() @@ -127,8 +130,9 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result { pub fn create_expr_to_request( table_id: TableId, expr: CreateTableExpr, + require_time_index: bool, ) -> Result { - let schema = create_table_schema(&expr)?; + let schema = create_table_schema(&expr, require_time_index)?; let primary_key_indices = expr .primary_keys .iter() diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b979222e5d..bfdc6b679a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -30,6 +30,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes = { path = "../datatypes" } +file-table-engine = { path = "../file-table-engine" } futures = "0.3" futures-util.workspace = true hyper = { version = "0.14", features = ["full"] } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9cd8de1e76..7e75cfe904 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; use common_error::prelude::*; use common_procedure::ProcedureId; +use serde_json::error::Error as JsonError; use snafu::Location; use storage::error::Error as StorageError; use table::error::Error as TableError; @@ -305,6 +306,18 @@ pub enum Error { source: common_time::error::Error, }, + #[snafu(display("Failed to infer schema: {}", source))] + InferSchema { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to prepare immutable table: {}", source))] + PrepareImmutableTable { + #[snafu(backtrace)] + source: query::error::Error, + }, + #[snafu(display("Failed to access catalog, source: {}", source))] Catalog { #[snafu(backtrace)] @@ -314,6 +327,7 @@ pub enum Error { #[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))] FindTable { table_name: String, + #[snafu(backtrace)] source: catalog::error::Error, }, @@ -424,6 +438,12 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to encode object into json, source: {}", source))] + EncodeJson { + location: Location, + source: JsonError, + }, } pub type Result = std::result::Result; @@ -464,6 +484,8 @@ impl ErrorExt for Error { ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(), + InferSchema { source, .. } => source.status_code(), + ColumnValuesNumberMismatch { .. } | InvalidSql { .. } | NotSupportSql { .. } @@ -479,7 +501,10 @@ impl ErrorExt for Error { | DatabaseNotFound { .. } | MissingNodeId { .. } | MissingMetasrvOpts { .. } - | ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments, + | ColumnNoneDefaultValue { .. } + | PrepareImmutableTable { .. } => StatusCode::InvalidArguments, + + EncodeJson { .. } => StatusCode::Unexpected, // TODO(yingwen): Further categorize http error. StartServer { .. } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 34af29472d..93fcebf8e8 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -27,6 +27,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::info; +use file_table_engine::engine::immutable::ImmutableFileTableEngine; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use meta_client::client::{MetaClient, MetaClientBuilder}; @@ -109,7 +110,7 @@ impl Instance { let object_store = new_object_store(&opts.storage.store).await?; let log_store = Arc::new(create_log_store(&opts.wal).await?); - let table_engine = Arc::new(DefaultEngine::new( + let mito_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::from(opts), @@ -117,19 +118,30 @@ impl Instance { object_store.clone(), compaction_scheduler, ), - object_store, + object_store.clone(), )); let mut engine_procedures = HashMap::with_capacity(2); engine_procedures.insert( - table_engine.name().to_string(), - table_engine.clone() as TableEngineProcedureRef, + mito_engine.name().to_string(), + mito_engine.clone() as TableEngineProcedureRef, ); - // TODO(yingwen): Insert the file table engine into `engine_procedures` - // once #1372 is ready. + + let immutable_file_engine = Arc::new(ImmutableFileTableEngine::new( + file_table_engine::config::EngineConfig::default(), + object_store.clone(), + )); + engine_procedures.insert( + immutable_file_engine.name().to_string(), + immutable_file_engine.clone() as TableEngineProcedureRef, + ); + let engine_manager = Arc::new( - MemoryTableEngineManager::new(table_engine.clone()) - .with_engine_procedures(engine_procedures), + MemoryTableEngineManager::with(vec![ + mito_engine.clone(), + immutable_file_engine.clone(), + ]) + .with_engine_procedures(engine_procedures), ); // create remote catalog manager @@ -198,12 +210,13 @@ impl Instance { // Register all procedures. if let Some(procedure_manager) = &procedure_manager { // Register procedures of the mito engine. - table_engine.register_procedure_loaders(&**procedure_manager); + mito_engine.register_procedure_loaders(&**procedure_manager); + immutable_file_engine.register_procedure_loaders(&**procedure_manager); // Register procedures in table-procedure crate. table_procedure::register_procedure_loaders( catalog_manager.clone(), - table_engine.clone(), - table_engine.clone(), + mito_engine.clone(), + mito_engine.clone(), &**procedure_manager, ); // TODO(yingwen): Register procedures of the file table engine once #1372 diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index fdda96db29..fb85adb359 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -83,6 +83,27 @@ impl Instance { .execute(SqlRequest::CreateTable(request), query_ctx) .await } + Statement::CreateExternalTable(create_external_table) => { + let table_id = self + .table_id_provider + .as_ref() + .context(TableIdProviderNotFoundSnafu)? + .next_table_id() + .await + .context(BumpTableIdSnafu)?; + let name = create_external_table.name.clone(); + let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; + let table_ref = TableReference::full(&catalog, &schema, &table); + let request = self + .sql_handler + .create_external_to_request(table_id, create_external_table, &table_ref) + .await?; + let table_id = request.id; + info!("Creating external table: {table_ref}, table id = {table_id}",); + self.sql_handler + .execute(SqlRequest::CreateTable(request), query_ctx) + .await + } Statement::Alter(alter_table) => { let name = alter_table.table_name().clone(); let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 85f46eac79..ab97e9183f 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr}; +use common_catalog::consts::IMMUTABLE_FILE_ENGINE; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; use common_telemetry::info; @@ -58,7 +59,9 @@ impl Instance { table_id }; - let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu)?; + let require_time_index = expr.engine != IMMUTABLE_FILE_ENGINE; + let request = create_expr_to_request(table_id, expr, require_time_index) + .context(CreateExprToRequestSnafu)?; self.sql_handler() .execute(SqlRequest::CreateTable(request), QueryContext::arc()) @@ -118,7 +121,7 @@ mod tests { async fn test_create_expr_to_request() { common_telemetry::init_default_ut_logging(); let expr = testing_create_expr(); - let request = create_expr_to_request(1024, expr).unwrap(); + let request = create_expr_to_request(1024, expr, true).unwrap(); assert_eq!(request.id, MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); @@ -130,7 +133,7 @@ mod tests { let mut expr = testing_create_expr(); expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = create_expr_to_request(1025, expr); + let result = create_expr_to_request(1025, expr, true); let err_msg = result.unwrap_err().to_string(); assert!( err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), @@ -142,11 +145,11 @@ mod tests { #[test] fn test_create_table_schema() { let mut expr = testing_create_expr(); - let schema = create_table_schema(&expr).unwrap(); + let schema = create_table_schema(&expr, true).unwrap(); assert_eq!(schema, expected_table_schema()); expr.time_index = "not-exist-column".to_string(); - let result = create_table_schema(&expr); + let result = create_table_schema(&expr, true); let err_msg = result.unwrap_err().to_string(); assert!( err_msg.contains("Missing timestamp column"), diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 67aa46b4ba..454c1d6ddd 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -32,6 +32,7 @@ use crate::instance::sql::table_idents_to_full_name; mod alter; mod create; +mod create_external; mod drop_table; mod flush_table; pub(crate) mod insert; diff --git a/src/datanode/src/sql/create_external.rs b/src/datanode/src/sql/create_external.rs new file mode 100644 index 0000000000..275398dbca --- /dev/null +++ b/src/datanode/src/sql/create_external.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use file_table_engine::table::immutable::ImmutableFileTableOptions; +use query::sql::prepare_immutable_file_table_files_and_schema; +use snafu::ResultExt; +use sql::statements::create::CreateExternalTable; +use table::engine::TableReference; +use table::metadata::TableId; +use table::requests::{CreateTableRequest, TableOptions, IMMUTABLE_TABLE_META_KEY}; + +use crate::error::{self, Result}; +use crate::sql::SqlHandler; + +impl SqlHandler { + pub(crate) async fn create_external_to_request( + &self, + table_id: TableId, + stmt: CreateExternalTable, + table_ref: &TableReference<'_>, + ) -> Result { + let mut options = stmt.options; + + let (files, schema) = + prepare_immutable_file_table_files_and_schema(&options, &stmt.columns) + .await + .context(error::PrepareImmutableTableSnafu)?; + + let meta = ImmutableFileTableOptions { files }; + options.insert( + IMMUTABLE_TABLE_META_KEY.to_string(), + serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?, + ); + + Ok(CreateTableRequest { + id: table_id, + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + desc: None, + schema, + region_numbers: vec![0], + primary_key_indices: vec![0], + create_if_not_exists: stmt.if_not_exists, + table_options: TableOptions::try_from(&options) + .context(error::UnrecognizedTableOptionSnafu)?, + engine: stmt.engine, + }) + } +} diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index c034858bcc..b0bcf29d4a 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -28,19 +28,14 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::error::{self as table_error, Result as TableResult}; use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; -use table::Table; +use table::{requests, Table}; -use super::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig}; use crate::error::{self, ConvertRawSnafu, Result}; use crate::manifest::immutable::{ read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION, }; use crate::manifest::table_manifest_dir; - -pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META"; -pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION"; -pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN"; -pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT"; +use crate::table::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig}; #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] @@ -125,19 +120,18 @@ impl ImmutableFileTable { let table_info = Arc::new(table_info); let options = &table_info.meta.options.extra_options; - let url = options.get(IMMUTABLE_TABLE_LOCATION_KEY).context( + let url = options + .get(requests::IMMUTABLE_TABLE_LOCATION_KEY) + .context(error::MissingRequiredFieldSnafu { + name: requests::IMMUTABLE_TABLE_LOCATION_KEY, + })?; + + let meta = options.get(requests::IMMUTABLE_TABLE_META_KEY).context( error::MissingRequiredFieldSnafu { - name: IMMUTABLE_TABLE_LOCATION_KEY, + name: requests::IMMUTABLE_TABLE_META_KEY, }, )?; - let meta = - options - .get(IMMUTABLE_TABLE_META_KEY) - .context(error::MissingRequiredFieldSnafu { - name: IMMUTABLE_TABLE_META_KEY, - })?; - let meta: ImmutableFileTableOptions = serde_json::from_str(meta).context(error::DecodeJsonSnafu)?; let format = Format::try_from(options).context(error::ParseFileFormatSnafu)?; diff --git a/src/file-table-engine/src/test_util.rs b/src/file-table-engine/src/test_util.rs index 96923d78ae..8b908424bd 100644 --- a/src/file-table-engine/src/test_util.rs +++ b/src/file-table-engine/src/test_util.rs @@ -22,13 +22,13 @@ use object_store::services::Fs; use object_store::ObjectStore; use table::engine::{table_dir, EngineContext, TableEngine}; use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; -use table::requests::{CreateTableRequest, TableOptions}; +use table::requests::{self, CreateTableRequest, TableOptions}; use table::TableRef; use crate::config::EngineConfig; use crate::engine::immutable::ImmutableFileTableEngine; use crate::manifest::immutable::ImmutableMetadata; -use crate::table::immutable::{self, ImmutableFileTableOptions}; +use crate::table::immutable::ImmutableFileTableOptions; pub const TEST_TABLE_NAME: &str = "demo"; @@ -98,15 +98,15 @@ pub struct TestEngineComponents { pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { let mut table_options = TableOptions::default(); table_options.extra_options.insert( - immutable::IMMUTABLE_TABLE_LOCATION_KEY.to_string(), + requests::IMMUTABLE_TABLE_LOCATION_KEY.to_string(), "mock_path".to_string(), ); table_options.extra_options.insert( - immutable::IMMUTABLE_TABLE_META_KEY.to_string(), + requests::IMMUTABLE_TABLE_META_KEY.to_string(), serde_json::to_string(&ImmutableFileTableOptions::default()).unwrap(), ); table_options.extra_options.insert( - immutable::IMMUTABLE_TABLE_FORMAT_KEY.to_string(), + requests::IMMUTABLE_TABLE_FORMAT_KEY.to_string(), "csv".to_string(), ); diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index da3048ee02..869f514e22 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -32,6 +32,7 @@ datafusion-common.workspace = true datafusion-expr.workspace = true datanode = { path = "../datanode" } datatypes = { path = "../datatypes" } +file-table-engine = { path = "../file-table-engine" } futures = "0.3" futures-util.workspace = true itertools = "0.10" diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 6c4fa9ab40..154c353621 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -484,6 +484,18 @@ pub enum Error { file_schema: String, location: Location, }, + + #[snafu(display("Failed to encode object into json, source: {}", source))] + EncodeJson { + source: serde_json::error::Error, + location: Location, + }, + + #[snafu(display("Failed to prepare immutable table: {}", source))] + PrepareImmutableTable { + #[snafu(backtrace)] + source: query::error::Error, + }, } pub type Result = std::result::Result; @@ -504,7 +516,8 @@ impl ErrorExt for Error { | Error::MissingMetasrvOpts { .. } | Error::ColumnNoneDefaultValue { .. } | Error::BuildRegex { .. } - | Error::InvalidSchema { .. } => StatusCode::InvalidArguments, + | Error::InvalidSchema { .. } + | Error::PrepareImmutableTable { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, @@ -541,7 +554,8 @@ impl ErrorExt for Error { Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } - | Error::ContextValueNotFound { .. } => StatusCode::Unexpected, + | Error::ContextValueNotFound { .. } + | Error::EncodeJson { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 5320d37058..e32cdcfbe2 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -20,13 +20,15 @@ use api::v1::{Column, ColumnDataType, CreateTableExpr}; use common_error::prelude::BoxedError; use datanode::instance::sql::table_idents_to_full_name; use datatypes::schema::ColumnSchema; +use file_table_engine::table::immutable::ImmutableFileTableOptions; +use query::sql::prepare_immutable_file_table_files_and_schema; use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::column_def_to_schema; -use sql::statements::create::{CreateTable, TIME_INDEX}; +use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX}; use sql::util::to_lowercase_options_map; -use table::requests::TableOptions; +use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY}; use crate::error::{ self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, @@ -76,6 +78,44 @@ impl CreateExprFactory for DefaultCreateExprFactory { } } +pub(crate) async fn create_external_expr( + create: CreateExternalTable, + query_ctx: QueryContextRef, +) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&create.name, query_ctx) + .map_err(BoxedError::new) + .context(error::ExternalSnafu)?; + + let mut options = create.options; + + let (files, schema) = prepare_immutable_file_table_files_and_schema(&options, &create.columns) + .await + .context(error::PrepareImmutableTableSnafu)?; + + let meta = ImmutableFileTableOptions { files }; + options.insert( + IMMUTABLE_TABLE_META_KEY.to_string(), + serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?, + ); + + let expr = CreateTableExpr { + catalog_name, + schema_name, + table_name, + desc: "".to_string(), + column_defs: column_schemas_to_defs(schema.column_schemas)?, + time_index: "".to_string(), + primary_keys: vec![], + create_if_not_exists: create.if_not_exists, + table_options: options, + table_id: None, + region_ids: vec![], + engine: create.engine.to_string(), + }; + Ok(expr) +} + /// Convert `CreateTable` statement to `CreateExpr` gRPC request. pub(crate) fn create_to_expr( create: &CreateTable, diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index f2b28ac6e3..0588b1dbbb 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -334,6 +334,11 @@ impl DistInstance { let _ = self.create_table(create_expr, stmt.partitions).await?; Ok(Output::AffectedRows(0)) } + Statement::CreateExternalTable(stmt) => { + let create_expr = &mut expr_factory::create_external_expr(stmt, query_ctx).await?; + self.create_table(create_expr, None).await?; + Ok(Output::AffectedRows(0)) + } Statement::Alter(alter_table) => { let expr = grpc::to_alter_expr(alter_table, query_ctx)?; self.handle_alter_table(expr).await @@ -673,7 +678,7 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result { schema: raw_schema, primary_key_indices, value_indices: vec![], - engine: "mito".to_string(), + engine: create_table.engine.clone(), next_column_id: column_schemas.len() as u32, region_numbers: vec![], engine_options: HashMap::new(), diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index c19e4fd592..5fce5517ae 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -77,6 +77,10 @@ impl StatementExecutor { let mut rows_inserted = 0; for entry in entries.iter() { let path = entry.path(); + // skips directories. + if entry.path().ends_with('/') { + continue; + } let reader = object_store .reader(path) .await diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index ea8948abad..0924436de9 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -479,6 +479,39 @@ async fn test_execute_create(instance: Arc) { assert!(matches!(output, Output::AffectedRows(0))); } +#[apply(both_instances_cases)] +async fn test_execute_external_create(instance: Arc) { + let instance = instance.frontend(); + + let output = execute_sql( + &instance, + r#"create external table test_table( + host string, + ts timestamp, + cpu double default 0, + memory double + ) with (location='/tmp/', format='csv');"#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); +} + +#[apply(both_instances_cases)] +async fn test_execute_external_create_without_ts_type(instance: Arc) { + let instance = instance.frontend(); + + let output = execute_sql( + &instance, + r#"create external table test_table( + host string, + cpu double default 0, + memory double + ) with (location='/tmp/', format='csv');"#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); +} + #[apply(standalone_instance_case)] async fn test_rename_table(instance: Arc) { let instance = instance.frontend(); diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 78ff4c41a9..3d0fcb0849 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -12,6 +12,7 @@ catalog = { path = "../catalog" } chrono.workspace = true common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } +common-datasource = { path = "../common/datasource" } common-error = { path = "../common/error" } common-function = { path = "../common/function" } common-query = { path = "../common/query" } @@ -29,9 +30,11 @@ futures = "0.3" futures-util.workspace = true humantime = "2.1" metrics.workspace = true +object-store = { path = "../object-store" } once_cell = "1.10" promql = { path = "../promql" } promql-parser = "0.1.0" +regex = "1.6" serde.workspace = true serde_json = "1.0" session = { path = "../session" } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 2be90f0438..393728d9a9 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -126,6 +126,54 @@ pub enum Error { #[snafu(backtrace)] source: sql::error::Error, }, + + #[snafu(display("Failed to parse SQL, source: {}", source))] + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, + + #[snafu(display("Missing required field: {}", name))] + MissingRequiredField { name: String, location: Location }, + + #[snafu(display("Failed to regex, source: {}", source))] + BuildRegex { + location: Location, + source: regex::Error, + }, + + #[snafu(display("Failed to build data source backend, source: {}", source))] + BuildBackend { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to list objects, source: {}", source))] + ListObjects { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Unsupported file format: {}", format))] + UnsupportedFileFormat { format: String, location: Location }, + + #[snafu(display("Failed to parse file format: {}", source))] + ParseFileFormat { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to infer schema: {}", source))] + InferSchema { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to convert datafusion schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } impl ErrorExt for Error { @@ -139,12 +187,22 @@ impl ErrorExt for Error { | SchemaNotFound { .. } | TableNotFound { .. } | ParseTimestamp { .. } - | ParseFloat { .. } => StatusCode::InvalidArguments, + | ParseFloat { .. } + | MissingRequiredField { .. } + | BuildRegex { .. } + | UnsupportedFileFormat { .. } + | ConvertSchema { .. } => StatusCode::InvalidArguments, + + BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, + + ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), + QueryAccessDenied { .. } => StatusCode::AccessDenied, Catalog { source } => source.status_code(), VectorComputation { source } | ConvertDatafusionSchema { source } => { source.status_code() } + ParseSql { source } => source.status_code(), CreateRecordBatch { source } => source.status_code(), QueryExecution { source } | QueryPlan { source } => source.status_code(), DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal, diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index e369747f26..7516baab45 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -14,20 +14,35 @@ mod show; +use std::collections::HashMap; use std::sync::Arc; use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_datasource::file_format::csv::CsvFormat; +use common_datasource::file_format::json::JsonFormat; +use common_datasource::file_format::parquet::ParquetFormat; +use common_datasource::file_format::{infer_schemas, FileFormat}; +use common_datasource::lister::{Lister, Source}; +use common_datasource::object_store::build_backend; +use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::RecordBatches; use datatypes::prelude::*; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, RawSchema, Schema}; use datatypes::vectors::{Helper, StringVector}; +use object_store::ObjectStore; use once_cell::sync::Lazy; +use regex::Regex; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use sql::ast::ColumnDef; +use sql::statements::column_def_to_schema; use sql::statements::create::Partitions; use sql::statements::show::{ShowDatabases, ShowKind, ShowTables}; +use table::requests::{ + IMMUTABLE_TABLE_FORMAT_KEY, IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY, +}; use table::TableRef; use crate::error::{self, Result}; @@ -250,6 +265,105 @@ fn describe_column_semantic_types( )) } +pub async fn prepare_immutable_file_table_files_and_schema( + options: &HashMap, + columns: &Vec, +) -> Result<(Vec, RawSchema)> { + let (object_store, files) = prepare_immutable_file_table(options).await?; + let schema = if !columns.is_empty() { + let columns_schemas: Vec<_> = columns + .iter() + .map(|column| column_def_to_schema(column, false).context(error::ParseSqlSnafu)) + .collect::>>()?; + RawSchema::new(columns_schemas) + } else { + let format = parse_immutable_file_table_format(options)?; + infer_immutable_file_table_schema(&object_store, &*format, &files).await? + }; + + Ok((files, schema)) +} + +// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode. +async fn prepare_immutable_file_table( + options: &HashMap, +) -> Result<(ObjectStore, Vec)> { + let url = + options + .get(IMMUTABLE_TABLE_LOCATION_KEY) + .context(error::MissingRequiredFieldSnafu { + name: IMMUTABLE_TABLE_LOCATION_KEY, + })?; + + let (dir, filename) = find_dir_and_filename(url); + let source = if let Some(filename) = filename { + Source::Filename(filename) + } else { + Source::Dir + }; + let regex = options + .get(IMMUTABLE_TABLE_PATTERN_KEY) + .map(|x| Regex::new(x)) + .transpose() + .context(error::BuildRegexSnafu)?; + let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?; + let lister = Lister::new(object_store.clone(), source, dir, regex); + // If we scan files in a directory every time the database restarts, + // then it might lead to a potential undefined behavior: + // If a user adds a file with an incompatible schema to that directory, + // it will make the external table unavailable. + let files = lister + .list() + .await + .context(error::ListObjectsSnafu)? + .into_iter() + .filter_map(|entry| { + if entry.path().ends_with('/') { + None + } else { + Some(entry.path().to_string()) + } + }) + .collect::>(); + Ok((object_store, files)) +} + +fn parse_immutable_file_table_format( + options: &HashMap, +) -> Result> { + let format = options + .get(IMMUTABLE_TABLE_FORMAT_KEY) + .cloned() + .unwrap_or_default() + .to_uppercase(); + + match format.as_str() { + "CSV" => { + let file_format = CsvFormat::try_from(options).context(error::ParseFileFormatSnafu)?; + Ok(Box::new(file_format)) + } + "JSON" => { + let file_format = JsonFormat::try_from(options).context(error::ParseFileFormatSnafu)?; + Ok(Box::new(file_format)) + } + "PARQUET" => Ok(Box::new(ParquetFormat {})), + format => error::UnsupportedFileFormatSnafu { format }.fail(), + } +} + +async fn infer_immutable_file_table_schema( + object_store: &ObjectStore, + file_format: &dyn FileFormat, + files: &[String], +) -> Result { + let merged = infer_schemas(object_store, files, file_format) + .await + .context(error::InferSchemaSnafu)?; + Ok(RawSchema::from( + &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?, + )) +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index fe5d7eb627..2093be65ab 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -209,6 +209,8 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result { Ok(ColumnType::MYSQL_TYPE_VARCHAR) } ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP), + ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE), + ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), _ => error::InternalSnafu { err_msg: format!( "not implemented for column datatype {:?}", diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index eddd23e1ca..7b761e09a6 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -15,7 +15,6 @@ use std::cmp::Ordering; use itertools::Itertools; -use mito::engine; use once_cell::sync::Lazy; use snafu::{ensure, OptionExt, ResultExt}; use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Value}; @@ -66,7 +65,9 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::TABLE) .context(error::SyntaxSnafu { sql: self.sql })?; - + let if_not_exists = + self.parser + .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); let table_name = self .parser .parse_object_name() @@ -75,9 +76,8 @@ impl<'a> ParserContext<'a> { expected: "a table name", actual: self.peek_token_as_string(), })?; - let (columns, constraints) = self.parse_columns()?; - + let engine = self.parse_table_engine(common_catalog::consts::IMMUTABLE_FILE_ENGINE)?; let options = self .parser .parse_options(Keyword::WITH) @@ -97,6 +97,8 @@ impl<'a> ParserContext<'a> { columns, constraints, options, + if_not_exists, + engine, })) } @@ -141,7 +143,7 @@ impl<'a> ParserContext<'a> { let partitions = self.parse_partitions()?; - let engine = self.parse_table_engine()?; + let engine = self.parse_table_engine(common_catalog::consts::MITO_ENGINE)?; let options = self .parser .parse_options(Keyword::WITH) @@ -552,9 +554,9 @@ impl<'a> ParserContext<'a> { } /// Parses the set of valid formats - fn parse_table_engine(&mut self) -> Result { + fn parse_table_engine(&mut self, default: &str) -> Result { if !self.consume_token(ENGINE) { - return Ok(engine::MITO_ENGINE.to_string()); + return Ok(default.to_string()); } self.parser @@ -780,6 +782,7 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; + use common_catalog::consts::IMMUTABLE_FILE_ENGINE; use sqlparser::ast::ColumnOption::NotNull; use sqlparser::dialect::GenericDialect; @@ -791,16 +794,32 @@ mod tests { sql: &'a str, expected_table_name: &'a str, expected_options: HashMap, + expected_engine: &'a str, + expected_if_not_exist: bool, } - let tests = [Test { - sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');", - expected_table_name: "city", - expected_options: HashMap::from([ - ("LOCATION".to_string(), "/var/data/city.csv".to_string()), - ("FORMAT".to_string(), "csv".to_string()), - ]), - }]; + let tests = [ + Test { + sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');", + expected_table_name: "city", + expected_options: HashMap::from([ + ("LOCATION".to_string(), "/var/data/city.csv".to_string()), + ("FORMAT".to_string(), "csv".to_string()), + ]), + expected_engine: IMMUTABLE_FILE_ENGINE, + expected_if_not_exist: false, + }, + Test { + sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');", + expected_table_name: "city", + expected_options: HashMap::from([ + ("LOCATION".to_string(), "/var/data/city.csv".to_string()), + ("FORMAT".to_string(), "csv".to_string()), + ]), + expected_engine: "foo", + expected_if_not_exist: true, + }, + ]; for test in tests { let stmts = ParserContext::create_with_dialect(test.sql, &GenericDialect {}).unwrap(); @@ -809,6 +828,8 @@ mod tests { Statement::CreateExternalTable(c) => { assert_eq!(c.name.to_string(), test.expected_table_name.to_string()); assert_eq!(c.options, test.expected_options); + assert_eq!(c.if_not_exists, test.expected_if_not_exist); + assert_eq!(c.engine, test.expected_engine); } _ => unreachable!(), } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index f4202563c5..4a9eb00337 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -195,6 +195,8 @@ pub struct CreateExternalTable { /// All keys are uppercase. /// TODO(weny): unify the key's case styling. pub options: HashMap, + pub if_not_exists: bool, + pub engine: String, } #[cfg(test)] diff --git a/src/table/src/engine/manager.rs b/src/table/src/engine/manager.rs index f3d22271f5..aefe99cfbe 100644 --- a/src/table/src/engine/manager.rs +++ b/src/table/src/engine/manager.rs @@ -72,6 +72,18 @@ impl MemoryTableEngineManager { self.engine_procedures = RwLock::new(engine_procedures); self } + + pub fn with(engines: Vec) -> Self { + let engines = engines + .into_iter() + .map(|engine| (engine.name().to_string(), engine)) + .collect::>(); + let engines = RwLock::new(engines); + MemoryTableEngineManager { + engines, + engine_procedures: RwLock::new(HashMap::new()), + } + } } #[async_trait] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index d29bd484e3..56ff0a513c 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -29,6 +29,11 @@ use crate::error; use crate::error::ParseTableOptionSnafu; use crate::metadata::TableId; +pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META"; +pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION"; +pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN"; +pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT"; + #[derive(Debug, Clone)] pub struct CreateDatabaseRequest { pub db_name: String,