diff --git a/Cargo.lock b/Cargo.lock index be2e1b0926..d4e548bfd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2099,6 +2099,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-insert", "common-query", "common-recordbatch", "common-runtime", @@ -2110,6 +2111,7 @@ dependencies = [ "datanode", "datatypes", "futures", + "futures-util", "itertools", "meta-client", "meta-srv", diff --git a/README.md b/README.md index 1c538ec762..51df854010 100644 --- a/README.md +++ b/README.md @@ -30,30 +30,35 @@ docker build --network host -f docker/Dockerfile -t greptimedb . ## Usage -### Start Datanode +### Start in standalone mode ``` -// Start datanode with default options. -cargo run -- datanode start +// Start datanode and frontend with default options. +cargo run -- --log-level=debug standalone start OR -// Start datanode with `http-addr` option. -cargo run -- datanode start --http-addr=0.0.0.0:9999 +// Start with `http-addr` option. +cargo run -- --log-level=debug standalone start --http-addr=0.0.0.0:9999 OR +// Start with `mysql-addr` option. +cargo run -- --log-level=debug standalone start --mysql-addr=0.0.0.0:9999 + +OR // Start datanode with `log-dir` and `log-level` options. -cargo run -- --log-dir=logs --log-level=debug datanode start -``` - -Start datanode with config file: +cargo run -- --log-dir=logs --log-level=debug standalone start --mysql-addr=0.0.0.0:4102 ``` -cargo run -- --log-dir=logs --log-level=debug datanode start -c ./config/datanode.example.toml + +Start with config file: + +``` +cargo run -- --log-dir=logs --log-level=debug standalone start -c ./config/standalone.example.toml ``` -Start datanode by runing docker container: +Start datanode by running docker container: ``` docker run -p 3000:3000 \ @@ -62,46 +67,24 @@ docker run -p 3000:3000 \ greptimedb ``` -### Start Frontend - -Frontend should connect to Datanode, so **Datanode must have been started** at first! - -``` -// Connects to local Datanode at its default GRPC port: 3001 - -// Start Frontend with default options. -cargo run -- frontend start - -OR - -// Start Frontend with `mysql-addr` option. -cargo run -- frontend start --mysql-addr=0.0.0.0:9999 - -OR - -// Start datanode with `log-dir` and `log-level` options. -cargo run -- --log-dir=logs --log-level=debug frontend start -``` - -Start datanode with config file: - -``` -cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/frontend.example.toml -``` - ### SQL Operations 1. Connecting DB by [mysql client](https://dev.mysql.com/downloads/mysql/): ``` - # The datanode listen on port 3306 by default. - mysql -h 127.0.0.1 -P 3306 + # The standalone instance listen on port 4002 by default. + mysql -h 127.0.0.1 -P 4002 ``` +2. Create a database; +```SQL +CREATE DATABASE hello_greptime; +``` + 2. Create table: ```SQL - CREATE TABLE monitor ( + CREATE TABLE hello_greptime.monitor ( host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, @@ -113,22 +96,22 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten 3. Insert data: ```SQL - INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host1', 66.6, 1024, 1660897955000); - INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host2', 77.7, 2048, 1660897956000); - INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host3', 88.8, 4096, 1660897957000); + INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host1', 66.6, 1024, 1660897955000); + INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host2', 77.7, 2048, 1660897956000); + INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host3', 88.8, 4096, 1660897957000); ``` 4. Query data: ```SQL - mysql> SELECT * FROM monitor; - +-------+---------------+------+--------+ - | host | ts | cpu | memory | - +-------+---------------+------+--------+ - | host1 | 1660897955000 | 66.6 | 1024 | - | host2 | 1660897956000 | 77.7 | 2048 | - | host3 | 1660897957000 | 88.8 | 4096 | - +-------+---------------+------+--------+ + mysql> SELECT * FROM hello_greptime.monitor; + +-------+---------------------+------+--------+ + | host | ts | cpu | memory | + +-------+---------------------+------+--------+ + | host1 | 2022-08-19 08:32:35 | 66.6 | 1024 | + | host2 | 2022-08-19 08:32:36 | 77.7 | 2048 | + | host3 | 2022-08-19 08:32:37 | 88.8 | 4096 | + +-------+---------------------+------+--------+ 3 rows in set (0.01 sec) ``` You can delete your data by removing `/tmp/greptimedb`. diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 02b5a5a8e1..bde0941837 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -97,11 +97,13 @@ async fn write_data( let row_count = record_batch.num_rows(); let insert_batch = convert_record_batch(record_batch).into(); let insert_expr = InsertExpr { + schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), expr: Some(insert_expr::Expr::Values(insert_expr::Values { values: vec![insert_batch], })), options: HashMap::default(), + region_number: 0, }; let now = Instant::now(); db.insert(insert_expr).await.unwrap(); @@ -342,6 +344,8 @@ fn create_table_expr() -> CreateExpr { primary_keys: vec!["VendorID".to_string()], create_if_not_exists: false, table_options: Default::default(), + region_ids: vec![0], + table_id: Some(0), } } diff --git a/config/standalone.example.toml b/config/standalone.example.toml new file mode 100644 index 0000000000..eec26b1f59 --- /dev/null +++ b/config/standalone.example.toml @@ -0,0 +1,14 @@ +mode = 'standalone' +datanode_rpc_addr = '127.0.0.1:3001' +http_addr = '0.0.0.0:4000' + +[grpc_options] +addr = '0.0.0.0:4001' +runtime_size = 4 + +[mysql_options] +addr = '0.0.0.0:4003' +runtime_size = 4 + +[influxdb_options] +enable = true diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/admin.proto index 5dcd021d2f..03e058523e 100644 --- a/src/api/greptime/v1/admin.proto +++ b/src/api/greptime/v1/admin.proto @@ -41,6 +41,8 @@ message CreateExpr { repeated string primary_keys = 7; bool create_if_not_exists = 8; map table_options = 9; + optional uint32 table_id = 10; + repeated uint32 region_ids = 11; } message AlterExpr { @@ -48,12 +50,17 @@ message AlterExpr { optional string schema_name = 2; string table_name = 3; oneof kind { - AddColumn add_column = 4; + AddColumns add_columns = 4; } } +message AddColumns { + repeated AddColumn add_columns = 1; +} + message AddColumn { ColumnDef column_def = 1; + bool is_key = 2; } message CreateDatabaseExpr { diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 117571b333..cbefdf19da 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -38,14 +38,15 @@ message PhysicalPlan { } message InsertExpr { - string table_name = 1; + string schema_name = 1; + string table_name = 2; message Values { - repeated bytes values = 1; + repeated bytes values = 3; } oneof expr { - Values values = 2; + Values values = 4; // TODO(LFC): Remove field "sql" in InsertExpr. // When Frontend instance received an insertion SQL (`insert into ...`), it's anticipated to parse the SQL and @@ -54,10 +55,12 @@ message InsertExpr { // Then why the "sql" field exists here? It's because the Frontend needs table schema to create the values to insert, // which is currently not able to find anywhere. (Maybe the table schema is suppose to be fetched from Meta?) // The "sql" field is meant to be removed in the future. - string sql = 3; + string sql = 5; } - map options = 4; + /// The region number of current insert request. + uint32 region_number = 6; + map options = 7; } // TODO(jiachun) diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index d5e3d6188e..ae2c2912fb 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1,6 +1,7 @@ pub mod error; pub mod helper; pub mod prometheus; +pub mod result; pub mod serde; pub mod v1; diff --git a/src/datanode/src/server/grpc/handler.rs b/src/api/src/result.rs similarity index 90% rename from src/datanode/src/server/grpc/handler.rs rename to src/api/src/result.rs index c90c5f8b10..34b65470d7 100644 --- a/src/datanode/src/server/grpc/handler.rs +++ b/src/api/src/result.rs @@ -1,8 +1,9 @@ -use api::v1::{ +use common_error::prelude::ErrorExt; + +use crate::v1::{ admin_result, codec::SelectResult, object_result, AdminResult, MutateResult, ObjectResult, ResultHeader, SelectResult as SelectResultRaw, }; -use common_error::prelude::ErrorExt; pub const PROTOCOL_VERSION: u32 = 1; @@ -10,14 +11,14 @@ pub type Success = u32; pub type Failure = u32; #[derive(Default)] -pub(crate) struct ObjectResultBuilder { +pub struct ObjectResultBuilder { version: u32, code: u32, err_msg: Option, result: Option, } -pub(crate) enum Body { +pub enum Body { Mutate((Success, Failure)), Select(SelectResult), } @@ -80,7 +81,7 @@ impl ObjectResultBuilder { } } -pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult { +pub fn build_err_result(err: &impl ErrorExt) -> ObjectResult { ObjectResultBuilder::new() .status_code(err.status_code() as u32) .err_msg(err.to_string()) @@ -88,7 +89,7 @@ pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult { } #[derive(Debug)] -pub(crate) struct AdminResultBuilder { +pub struct AdminResultBuilder { version: u32, code: u32, err_msg: Option, @@ -144,11 +145,11 @@ impl Default for AdminResultBuilder { #[cfg(test)] mod tests { - use api::v1::{object_result, MutateResult}; use common_error::status_code::StatusCode; use super::*; - use crate::error::UnsupportedExprSnafu; + use crate::error::UnknownColumnDataTypeSnafu; + use crate::v1::{object_result, MutateResult}; #[test] fn test_object_result_builder() { @@ -175,14 +176,13 @@ mod tests { #[test] fn test_build_err_result() { - let err = UnsupportedExprSnafu { name: "select" }.build(); + let err = UnknownColumnDataTypeSnafu { datatype: 1 }.build(); let err_result = build_err_result(&err); let header = err_result.header.unwrap(); let result = err_result.result; assert_eq!(PROTOCOL_VERSION, header.version); - assert_eq!(StatusCode::Internal as u32, header.code); - assert_eq!("Unsupported expr type: select", header.err_msg); + assert_eq!(StatusCode::InvalidArguments as u32, header.code); assert!(result.is_none()); } } diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 072a59f41d..ed2a8018a2 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -171,12 +171,6 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Failed to bump table id"))] - BumpTableId { msg: String, backtrace: Backtrace }, - - #[snafu(display("Failed to parse table id from metasrv, data: {:?}", data))] - ParseTableId { data: String, backtrace: Backtrace }, - #[snafu(display("Failed to deserialize partition rule from string: {:?}", data))] DeserializePartitionRule { data: String, @@ -232,9 +226,6 @@ impl ErrorExt for Error { Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableSchema { source, .. } => source.status_code(), - Error::BumpTableId { .. } | Error::ParseTableId { .. } => { - StatusCode::StorageUnavailable - } Error::DeserializePartitionRule { .. } => StatusCode::Unexpected, Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected, Error::Internal { source, .. } => source.status_code(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 57b68a9182..466a4d4533 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -69,9 +69,6 @@ pub trait CatalogManager: CatalogList { /// Starts a catalog manager. async fn start(&self) -> Result<()>; - /// Returns next available table id. - async fn next_table_id(&self) -> Result; - /// Registers a table given given catalog/schema to catalog manager, /// returns table registered. async fn register_table(&self, request: RegisterTableRequest) -> Result; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 8b057ee495..7d9887bedb 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -16,6 +16,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; use table::requests::OpenTableRequest; use table::table::numbers::NumbersTable; +use table::table::TableIdProvider; use table::TableRef; use crate::error::{ @@ -278,6 +279,13 @@ impl CatalogList for LocalCatalogManager { } } +#[async_trait::async_trait] +impl TableIdProvider for LocalCatalogManager { + async fn next_table_id(&self) -> table::Result { + Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed)) + } +} + #[async_trait::async_trait] impl CatalogManager for LocalCatalogManager { /// Start [LocalCatalogManager] to load all information from system catalog table. @@ -286,11 +294,6 @@ impl CatalogManager for LocalCatalogManager { self.init().await } - #[inline] - async fn next_table_id(&self) -> Result { - Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed)) - } - async fn register_table(&self, request: RegisterTableRequest) -> Result { let started = self.init_lock.lock().await; diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index cfa6434cf3..7e515a9147 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -8,6 +8,7 @@ use std::sync::RwLock; use common_catalog::consts::MIN_USER_TABLE_ID; use snafu::OptionExt; use table::metadata::TableId; +use table::table::TableIdProvider; use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; @@ -41,6 +42,13 @@ impl Default for MemoryCatalogManager { } } +#[async_trait::async_trait] +impl TableIdProvider for MemoryCatalogManager { + async fn next_table_id(&self) -> table::error::Result { + Ok(self.table_id.fetch_add(1, Ordering::Relaxed)) + } +} + #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { async fn start(&self) -> Result<()> { @@ -48,10 +56,6 @@ impl CatalogManager for MemoryCatalogManager { Ok(()) } - async fn next_table_id(&self) -> Result { - Ok(self.table_id.fetch_add(1, Ordering::Relaxed)) - } - async fn register_table(&self, request: RegisterTableRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 41f5993921..5537d0f66e 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -2,18 +2,15 @@ use std::any::Any; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::Duration; use arc_swap::ArcSwap; use async_stream::stream; -use backoff::exponential::ExponentialBackoffBuilder; -use backoff::ExponentialBackoff; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_catalog::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, }; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, info}; use futures::Stream; use futures_util::StreamExt; use snafu::{OptionExt, ResultExt}; @@ -25,8 +22,8 @@ use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - BumpTableIdSnafu, CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, - OpenTableSnafu, ParseTableIdSnafu, SchemaNotFoundSnafu, TableExistsSnafu, + CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, + SchemaNotFoundSnafu, TableExistsSnafu, }; use crate::error::{InvalidTableSchemaSnafu, Result}; use crate::remote::{Kv, KvBackendRef}; @@ -378,63 +375,6 @@ impl CatalogManager for RemoteCatalogManager { Ok(()) } - /// Bump table id in a CAS manner with backoff. - async fn next_table_id(&self) -> Result { - let key = common_catalog::consts::TABLE_ID_KEY_PREFIX.as_bytes(); - let op = || async { - // TODO(hl): optimize this get - let (prev, prev_bytes) = match self.backend.get(key).await? { - None => (MIN_USER_TABLE_ID, vec![]), - Some(kv) => (parse_table_id(&kv.1)?, kv.1), - }; - - match self - .backend - .compare_and_set(key, &prev_bytes, &(prev + 1).to_le_bytes()) - .await - { - Ok(cas_res) => match cas_res { - Ok(_) => Ok(prev), - Err(e) => { - info!("Table id {:?} already occupied", e); - Err(backoff::Error::transient( - BumpTableIdSnafu { - msg: "Table id occupied", - } - .build(), - )) - } - }, - Err(e) => { - error!(e;"Failed to CAS table id"); - Err(backoff::Error::permanent( - BumpTableIdSnafu { - msg: format!("Failed to perform CAS operation: {:?}", e), - } - .build(), - )) - } - } - }; - - let retry_policy: ExponentialBackoff = ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(4)) - .with_multiplier(2.0) - .with_max_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(); - - backoff::future::retry(retry_policy, op).await.map_err(|e| { - BumpTableIdSnafu { - msg: format!( - "Bump table id exceeds max fail times, last error msg: {:?}", - e - ), - } - .build() - }) - } - async fn register_table(&self, request: RegisterTableRequest) -> Result { let catalog_name = request.catalog; let schema_name = request.schema; @@ -614,16 +554,6 @@ impl CatalogProvider for RemoteCatalogProvider { } } -/// Parse u8 slice to `TableId` -fn parse_table_id(val: &[u8]) -> Result { - Ok(TableId::from_le_bytes(val.try_into().map_err(|_| { - ParseTableIdSnafu { - data: format!("{:?}", val), - } - .build() - })?)) -} - pub struct RemoteSchemaProvider { catalog_name: String, schema_name: String, @@ -745,17 +675,3 @@ impl SchemaProvider for RemoteSchemaProvider { Ok(self.tables.load().contains_key(name)) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_table_id() { - assert_eq!(12, parse_table_id(&12_i32.to_le_bytes()).unwrap()); - let mut data = vec![]; - data.extend_from_slice(&12_i32.to_le_bytes()); - data.push(0); - assert!(parse_table_id(&data).is_err()); - } -} diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index f29740ef28..5cbc5b37b1 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -11,8 +11,8 @@ mod tests { use catalog::remote::{ KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; - use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; + use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use datatypes::schema::Schema; use futures_util::StreamExt; @@ -61,7 +61,9 @@ mod tests { ); } - async fn prepare_components(node_id: u64) -> (KvBackendRef, TableEngineRef, CatalogManagerRef) { + async fn prepare_components( + node_id: u64, + ) -> (KvBackendRef, TableEngineRef, Arc) { let backend = Arc::new(MockKvBackend::default()) as KvBackendRef; let table_engine = Arc::new(MockTableEngine::default()); let catalog_manager = @@ -277,19 +279,4 @@ mod tests { new_catalog.schema_names().unwrap().into_iter().collect() ) } - - #[tokio::test] - async fn test_next_table_id() { - let node_id = 42; - let (_, _, catalog_manager) = prepare_components(node_id).await; - assert_eq!( - MIN_USER_TABLE_ID, - catalog_manager.next_table_id().await.unwrap() - ); - - assert_eq!( - MIN_USER_TABLE_ID + 1, - catalog_manager.next_table_id().await.unwrap() - ); - } } diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs index 43f625adfb..13850ebc11 100644 --- a/src/client/examples/insert.rs +++ b/src/client/examples/insert.rs @@ -15,11 +15,13 @@ async fn run() { let db = Database::new("greptime", client); let expr = InsertExpr { + schema_name: "public".to_string(), table_name: "demo".to_string(), expr: Some(insert_expr::Expr::Values(insert_expr::Values { values: insert_batches(), })), options: HashMap::default(), + region_number: 0, }; db.insert(expr).await.unwrap(); } diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 44c967c08f..843cb0f8a8 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -49,6 +49,8 @@ async fn run() { primary_keys: vec!["key".to_string()], create_if_not_exists: false, table_options: Default::default(), + table_id: Some(1024), + region_ids: vec![0], }; let admin = Admin::new("create table", client.clone()); diff --git a/src/client/src/database.rs b/src/client/src/database.rs index bff78ebda5..17e0908a48 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -124,8 +124,6 @@ impl Database { obj_result.try_into() } - // TODO(jiachun) update/delete - pub async fn object(&self, expr: ObjectExpr) -> Result { let res = self.objects(vec![expr]).await?.pop().unwrap(); Ok(res) diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 75243a645f..8615bcd6e4 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -1,10 +1,10 @@ use std::fmt; use clap::Parser; -use cmd::datanode; use cmd::error::Result; use cmd::frontend; use cmd::metasrv; +use cmd::{datanode, standalone}; use common_telemetry::logging::error; use common_telemetry::logging::info; @@ -33,6 +33,8 @@ enum SubCommand { Frontend(frontend::Command), #[clap(name = "metasrv")] Metasrv(metasrv::Command), + #[clap(name = "standalone")] + Standalone(standalone::Command), } impl SubCommand { @@ -41,6 +43,7 @@ impl SubCommand { SubCommand::Datanode(cmd) => cmd.run().await, SubCommand::Frontend(cmd) => cmd.run().await, SubCommand::Metasrv(cmd) => cmd.run().await, + SubCommand::Standalone(cmd) => cmd.run().await, } } } @@ -51,6 +54,7 @@ impl fmt::Display for SubCommand { SubCommand::Datanode(..) => write!(f, "greptime-datanode"), SubCommand::Frontend(..) => write!(f, "greptime-frontend"), SubCommand::Metasrv(..) => write!(f, "greptime-metasrv"), + SubCommand::Standalone(..) => write!(f, "greptime-standalone"), } } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 9d65a565a9..866783a8f5 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -11,6 +11,12 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("Failed to build frontend, source: {}", source))] + BuildFrontend { + #[snafu(backtrace)] + source: frontend::error::Error, + }, + #[snafu(display("Failed to start frontend, source: {}", source))] StartFrontend { #[snafu(backtrace)] @@ -38,6 +44,9 @@ pub enum Error { #[snafu(display("Missing config, msg: {}", msg))] MissingConfig { msg: String, backtrace: Backtrace }, + + #[snafu(display("Illegal config: {}", msg))] + IllegalConfig { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -51,6 +60,8 @@ impl ErrorExt for Error { Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { StatusCode::InvalidArguments } + Error::IllegalConfig { .. } => StatusCode::InvalidArguments, + Error::BuildFrontend { source, .. } => source.status_code(), } } diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 09ac5bbd35..56e81f9cdd 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -2,4 +2,5 @@ pub mod datanode; pub mod error; pub mod frontend; pub mod metasrv; +pub mod standalone; mod toml_loader; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs new file mode 100644 index 0000000000..02823dfade --- /dev/null +++ b/src/cmd/src/standalone.rs @@ -0,0 +1,192 @@ +use clap::Parser; +use common_telemetry::info; +use datanode::datanode::{Datanode, DatanodeOptions}; +use datanode::instance::InstanceRef; +use frontend::frontend::{Frontend, FrontendOptions, Mode}; +use frontend::grpc::GrpcOptions; +use frontend::influxdb::InfluxdbOptions; +use frontend::instance::Instance as FeInstance; +use frontend::mysql::MysqlOptions; +use frontend::opentsdb::OpentsdbOptions; +use frontend::postgres::PostgresOptions; +use snafu::ResultExt; +use tokio::try_join; + +use crate::error::{ + BuildFrontendSnafu, Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu, +}; +use crate::toml_loader; + +#[derive(Parser)] +pub struct Command { + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + pub async fn run(self) -> Result<()> { + self.subcmd.run().await + } +} + +#[derive(Parser)] +enum SubCommand { + Start(StartCommand), +} + +impl SubCommand { + async fn run(self) -> Result<()> { + match self { + SubCommand::Start(cmd) => cmd.run().await, + } + } +} + +#[derive(Debug, Parser)] +struct StartCommand { + #[clap(long)] + http_addr: Option, + #[clap(long)] + rpc_addr: Option, + #[clap(long)] + mysql_addr: Option, + #[clap(long)] + postgres_addr: Option, + #[clap(long)] + opentsdb_addr: Option, + #[clap(short, long)] + influxdb_enable: bool, + #[clap(short, long)] + config_file: Option, +} + +impl StartCommand { + async fn run(self) -> Result<()> { + let fe_opts = FrontendOptions::try_from(self)?; + let dn_opts = DatanodeOptions::default(); + + let mut datanode = Datanode::new(dn_opts.clone()) + .await + .context(StartDatanodeSnafu)?; + let mut frontend = build_frontend(fe_opts, &dn_opts, datanode.get_instance()).await?; + + try_join!( + async { datanode.start().await.context(StartDatanodeSnafu) }, + async { frontend.start().await.context(StartFrontendSnafu) } + )?; + + Ok(()) + } +} + +/// Build frontend instance in standalone mode +async fn build_frontend( + fe_opts: FrontendOptions, + dn_opts: &DatanodeOptions, + datanode_instance: InstanceRef, +) -> Result> { + let grpc_server_addr = &dn_opts.rpc_addr; + info!( + "Build frontend with datanode gRPC addr: {}", + grpc_server_addr + ); + let mut frontend_instance = FeInstance::try_new(&fe_opts) + .await + .context(BuildFrontendSnafu)?; + frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); + Ok(Frontend::new(fe_opts, frontend_instance)) +} + +impl TryFrom for FrontendOptions { + type Error = Error; + + fn try_from(cmd: StartCommand) -> std::result::Result { + let mut opts: FrontendOptions = if let Some(path) = cmd.config_file { + toml_loader::from_file!(&path)? + } else { + FrontendOptions::default() + }; + + opts.mode = Mode::Standalone; + + if let Some(addr) = cmd.http_addr { + opts.http_addr = Some(addr); + } + if let Some(addr) = cmd.rpc_addr { + // frontend grpc addr conflict with datanode default grpc addr + let datanode_grpc_addr = DatanodeOptions::default().rpc_addr; + if addr == datanode_grpc_addr { + return IllegalConfigSnafu { + msg: format!( + "gRPC listen address conflicts with datanode reserved gRPC addr: {}", + datanode_grpc_addr + ), + } + .fail(); + } + opts.grpc_options = Some(GrpcOptions { + addr, + ..Default::default() + }); + } + + if let Some(addr) = cmd.mysql_addr { + opts.mysql_options = Some(MysqlOptions { + addr, + ..Default::default() + }) + } + if let Some(addr) = cmd.postgres_addr { + opts.postgres_options = Some(PostgresOptions { + addr, + ..Default::default() + }) + } + + if let Some(addr) = cmd.opentsdb_addr { + opts.opentsdb_options = Some(OpentsdbOptions { + addr, + ..Default::default() + }); + } + + if cmd.influxdb_enable { + opts.influxdb_options = Some(InfluxdbOptions { enable: true }); + } + + Ok(opts) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_config_file() { + let cmd = StartCommand { + http_addr: None, + rpc_addr: None, + mysql_addr: None, + postgres_addr: None, + opentsdb_addr: None, + config_file: Some(format!( + "{}/../../config/standalone.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + influxdb_enable: false, + }; + + let fe_opts = FrontendOptions::try_from(cmd).unwrap(); + assert_eq!(Mode::Standalone, fe_opts.mode); + assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); + assert_eq!(Some("0.0.0.0:4000".to_string()), fe_opts.http_addr); + assert_eq!( + "0.0.0.0:4001".to_string(), + fe_opts.grpc_options.unwrap().addr + ); + assert_eq!("0.0.0.0:4003", fe_opts.mysql_options.as_ref().unwrap().addr); + assert_eq!(4, fe_opts.mysql_options.as_ref().unwrap().runtime_size); + assert!(fe_opts.influxdb_options.as_ref().unwrap().enable); + } +} diff --git a/src/common/insert/src/insert.rs b/src/common/insert/src/insert.rs index 132d95a000..18d0520f2a 100644 --- a/src/common/insert/src/insert.rs +++ b/src/common/insert/src/insert.rs @@ -5,49 +5,46 @@ use std::{ sync::Arc, }; -use api::{ - helper::ColumnDataTypeWrapper, - v1::{ - codec::InsertBatch, - column::{SemanticType, Values}, - Column, - }, +use api::v1::{ + codec::InsertBatch, + column::{SemanticType, Values}, + AddColumns, Column, }; +use api::v1::{AddColumn, ColumnDef, CreateExpr}; use common_base::BitVec; use common_time::timestamp::Timestamp; -use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use datatypes::schema::SchemaRef; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; use table::{ - requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest}, + requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}, Table, }; use crate::error::{ - ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateSchemaSnafu, DecodeInsertSnafu, - DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, MissingTimestampColumnSnafu, Result, + ColumnNotFoundSnafu, DecodeInsertSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, + MissingTimestampColumnSnafu, Result, }; const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; #[inline] -fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result { - let datatype_wrapper = ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?; - - Ok(ColumnSchema::new( - column_name, - datatype_wrapper.into(), - nullable, - )) +fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnDef { + ColumnDef { + name: column_name.to_string(), + datatype, + is_nullable: nullable, + default_constraint: None, + } } pub fn find_new_columns( schema: &SchemaRef, insert_batches: &[InsertBatch], -) -> Result>> { - let mut requests = Vec::default(); +) -> Result> { + let mut columns_to_add = Vec::default(); let mut new_columns: HashSet = HashSet::default(); for InsertBatch { columns, row_count } in insert_batches { @@ -65,10 +62,9 @@ pub fn find_new_columns( if schema.column_schema_by_name(column_name).is_none() && !new_columns.contains(column_name) { - let column_schema = build_column_schema(column_name, *datatype, true)?; - - requests.push(AddColumnRequest { - column_schema, + let column_def = Some(build_column_def(column_name, *datatype, true)); + columns_to_add.push(AddColumn { + column_def, is_key: *semantic_type == TAG_SEMANTIC_TYPE, }); new_columns.insert(column_name.to_string()); @@ -76,10 +72,12 @@ pub fn find_new_columns( } } - if requests.is_empty() { + if columns_to_add.is_empty() { Ok(None) } else { - Ok(Some(requests)) + Ok(Some(AddColumns { + add_columns: columns_to_add, + })) } } @@ -98,15 +96,15 @@ pub fn build_alter_table_request( } /// Try to build create table request from insert data. -pub fn build_create_table_request( +pub fn build_create_expr_from_insertion( catalog_name: &str, schema_name: &str, - table_id: TableId, + table_id: Option, table_name: &str, insert_batches: &[InsertBatch], -) -> Result { +) -> Result { let mut new_columns: HashSet = HashSet::default(); - let mut column_schemas = Vec::default(); + let mut column_defs = Vec::default(); let mut primary_key_indices = Vec::default(); let mut timestamp_index = usize::MAX; @@ -124,9 +122,8 @@ pub fn build_create_table_request( { if !new_columns.contains(column_name) { let mut is_nullable = true; - let mut is_time_index = false; match *semantic_type { - TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()), + TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()), TIMESTAMP_SEMANTIC_TYPE => { ensure!( timestamp_index == usize::MAX, @@ -135,42 +132,42 @@ pub fn build_create_table_request( duplicated: column_name, } ); - timestamp_index = column_schemas.len(); - is_time_index = true; + timestamp_index = column_defs.len(); // Timestamp column must not be null. is_nullable = false; } _ => {} } - let column_schema = build_column_schema(column_name, *datatype, is_nullable)? - .with_time_index(is_time_index); - column_schemas.push(column_schema); + let column_def = build_column_def(column_name, *datatype, is_nullable); + column_defs.push(column_def); new_columns.insert(column_name.to_string()); } } ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu); + let timestamp_field_name = columns[timestamp_index].column_name.clone(); - let schema = Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .context(CreateSchemaSnafu)?, - ); + let primary_keys = primary_key_indices + .iter() + .map(|idx| columns[*idx].column_name.clone()) + .collect::>(); - return Ok(CreateTableRequest { - id: table_id, - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), + let expr = CreateExpr { + catalog_name: Some(catalog_name.to_string()), + schema_name: Some(schema_name.to_string()), table_name: table_name.to_string(), - desc: None, - schema, + desc: Some("Created on insertion".to_string()), + column_defs, + time_index: timestamp_field_name, + primary_keys, create_if_not_exists: true, - primary_key_indices, - table_options: HashMap::new(), - region_numbers: vec![0], - }); + table_options: Default::default(), + table_id, + region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend + }; + + return Ok(expr); } IllegalInsertDataSnafu.fail() @@ -233,7 +230,7 @@ pub fn insertion_expr_to_request( } #[inline] -pub fn insert_batches(bytes_vec: Vec>) -> Result> { +pub fn insert_batches(bytes_vec: &[Vec]) -> Result> { bytes_vec .iter() .map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu)) @@ -365,6 +362,7 @@ mod tests { use std::any::Any; use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::{ codec::InsertBatch, column::{self, SemanticType, Values}, @@ -379,50 +377,114 @@ mod tests { schema::{ColumnSchema, SchemaBuilder, SchemaRef}, value::Value, }; + use snafu::ResultExt; use table::error::Result as TableResult; use table::metadata::TableInfoRef; use table::Table; use super::{ - build_column_schema, build_create_table_request, convert_values, find_new_columns, - insert_batches, insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, - TIMESTAMP_SEMANTIC_TYPE, + build_create_expr_from_insertion, convert_values, find_new_columns, insert_batches, + insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE, }; + use crate::error; + use crate::error::ColumnDataTypeSnafu; + + #[inline] + fn build_column_schema( + column_name: &str, + datatype: i32, + nullable: bool, + ) -> error::Result { + let datatype_wrapper = + ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?; + + Ok(ColumnSchema::new( + column_name, + datatype_wrapper.into(), + nullable, + )) + } #[test] fn test_build_create_table_request() { - let table_id = 10; + let table_id = Some(10); let table_name = "test_metric"; - assert!(build_create_table_request("", "", table_id, table_name, &[]).is_err()); + assert!(build_create_expr_from_insertion("", "", table_id, table_name, &[]).is_err()); - let insert_batches = insert_batches(mock_insert_batches()).unwrap(); + let mock_batch_bytes = mock_insert_batches(); + let insert_batches = insert_batches(&mock_batch_bytes).unwrap(); - let req = - build_create_table_request("", "", table_id, table_name, &insert_batches).unwrap(); - assert_eq!(table_id, req.id); - assert_eq!(table_name, req.table_name); - assert!(req.desc.is_none()); - assert_eq!(vec![0], req.primary_key_indices); + let create_expr = + build_create_expr_from_insertion("", "", table_id, table_name, &insert_batches) + .unwrap(); + + assert_eq!(table_id, create_expr.table_id); + assert_eq!(table_name, create_expr.table_name); + assert_eq!(Some("Created on insertion".to_string()), create_expr.desc); + assert_eq!( + vec![create_expr.column_defs[0].name.clone()], + create_expr.primary_keys + ); + + let column_defs = create_expr.column_defs; + assert_eq!(column_defs[3].name, create_expr.time_index); + assert_eq!(4, column_defs.len()); - let schema = req.schema; - assert_eq!(Some(3), schema.timestamp_index()); - assert_eq!(4, schema.num_columns()); assert_eq!( ConcreteDataType::string_datatype(), - schema.column_schema_by_name("host").unwrap().data_type + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "host") + .unwrap() + .datatype + ) + .unwrap() + ) ); + assert_eq!( ConcreteDataType::float64_datatype(), - schema.column_schema_by_name("cpu").unwrap().data_type + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "cpu") + .unwrap() + .datatype + ) + .unwrap() + ) ); + assert_eq!( ConcreteDataType::float64_datatype(), - schema.column_schema_by_name("memory").unwrap().data_type + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "memory") + .unwrap() + .datatype + ) + .unwrap() + ) ); + assert_eq!( ConcreteDataType::timestamp_millis_datatype(), - schema.column_schema_by_name("ts").unwrap().data_type + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "ts") + .unwrap() + .datatype + ) + .unwrap() + ) ); } @@ -440,22 +502,32 @@ mod tests { assert!(find_new_columns(&schema, &[]).unwrap().is_none()); - let insert_batches = insert_batches(mock_insert_batches()).unwrap(); - let new_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap(); + let mock_insert_bytes = mock_insert_batches(); + let insert_batches = insert_batches(&mock_insert_bytes).unwrap(); + let add_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap(); - assert_eq!(2, new_columns.len()); - let host_column = &new_columns[0]; + assert_eq!(2, add_columns.add_columns.len()); + let host_column = &add_columns.add_columns[0]; assert!(host_column.is_key); + assert_eq!( ConcreteDataType::string_datatype(), - host_column.column_schema.data_type + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(host_column.column_def.as_ref().unwrap().datatype) + .unwrap() + ) ); - let memory_column = &new_columns[1]; + + let memory_column = &add_columns.add_columns[1]; assert!(!memory_column.is_key); + assert_eq!( ConcreteDataType::float64_datatype(), - memory_column.column_schema.data_type - ) + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(memory_column.column_def.as_ref().unwrap().datatype) + .unwrap() + ) + ); } #[test] @@ -465,7 +537,7 @@ mod tests { let values = insert_expr::Values { values: mock_insert_batches(), }; - let insert_batches = insert_batches(values.values).unwrap(); + let insert_batches = insert_batches(&values.values).unwrap(); let insert_req = insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap(); diff --git a/src/common/insert/src/lib.rs b/src/common/insert/src/lib.rs index 2d36dfa33c..7ad7542e54 100644 --- a/src/common/insert/src/lib.rs +++ b/src/common/insert/src/lib.rs @@ -1,6 +1,6 @@ pub mod error; mod insert; pub use insert::{ - build_alter_table_request, build_create_table_request, find_new_columns, insert_batches, + build_alter_table_request, build_create_expr_from_insertion, find_new_columns, insert_batches, insertion_expr_to_request, }; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 466a0102f1..83f4fbcbd2 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -82,4 +82,8 @@ impl Datanode { self.services.start(&self.opts).await?; Ok(()) } + + pub fn get_instance(&self) -> InstanceRef { + self.instance.clone() + } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 2f7e7c40a9..97f5727720 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -284,16 +284,15 @@ pub enum Error { #[snafu(display("Insert batch is empty"))] EmptyInsertBatch, - #[snafu(display("Failed to build frontend instance, source: {}", source))] - BuildFrontend { - #[snafu(backtrace)] - source: frontend::error::Error, - }, + #[snafu(display( + "Table id provider not found, cannot execute SQL directly on datanode in distributed mode" + ))] + TableIdProviderNotFound { backtrace: Backtrace }, - #[snafu(display("Failed to start frontend instance, source: {}", source))] - StartFrontend { + #[snafu(display("Failed to bump table id, source: {}", source))] + BumpTableId { #[snafu(backtrace)] - source: frontend::error::Error, + source: table::error::Error, }, } @@ -363,9 +362,8 @@ impl ErrorExt for Error { Error::MetaClientInit { source, .. } => source.status_code(), Error::InsertData { source, .. } => source.status_code(), Error::EmptyInsertBatch => StatusCode::InvalidArguments, - Error::BuildFrontend { source, .. } | Error::StartFrontend { source, .. } => { - source.status_code() - } + Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported, + Error::BumpTableId { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5cbe41404d..bc5fda55f6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -13,6 +13,7 @@ use object_store::{services::fs::Builder, util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use snafu::prelude::*; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; +use table::table::TableIdProviderRef; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; @@ -35,6 +36,7 @@ pub struct Instance { pub(crate) catalog_manager: CatalogManagerRef, pub(crate) physical_planner: PhysicalPlanner, pub(crate) script_executor: ScriptExecutor, + pub(crate) table_id_provider: Option, #[allow(unused)] pub(crate) meta_client: Option>, pub(crate) heartbeat_task: Option, @@ -66,7 +68,7 @@ impl Instance { )); // create remote catalog manager - let (catalog_manager, factory) = match opts.mode { + let (catalog_manager, factory, table_id_provider) = match opts.mode { Mode::Standalone => { let catalog = Arc::new( catalog::local::LocalCatalogManager::try_new(table_engine.clone()) @@ -74,7 +76,11 @@ impl Instance { .context(CatalogSnafu)?, ); let factory = QueryEngineFactory::new(catalog.clone()); - (catalog as CatalogManagerRef, factory) + ( + catalog.clone() as CatalogManagerRef, + factory, + Some(catalog as TableIdProviderRef), + ) } Mode::Distributed => { @@ -86,7 +92,7 @@ impl Instance { }), )); let factory = QueryEngineFactory::new(catalog.clone()); - (catalog as CatalogManagerRef, factory) + (catalog as CatalogManagerRef, factory, None) } }; @@ -110,6 +116,7 @@ impl Instance { script_executor, meta_client, heartbeat_task, + table_id_provider, }) } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index e44345aa9e..1dee0a0e27 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -1,9 +1,7 @@ -use std::ops::Deref; - -use api::v1::codec::RegionNumber; +use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; use api::v1::{ - admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, - CreateDatabaseExpr, ObjectExpr, ObjectResult, SelectExpr, + admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, CreateDatabaseExpr, + ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -11,12 +9,11 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_insert::insertion_expr_to_request; use common_query::Output; -use common_telemetry::logging::{debug, info}; use query::plan::LogicalPlan; use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; use snafu::prelude::*; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use table::requests::{AddColumnRequest, CreateDatabaseRequest}; +use table::requests::CreateDatabaseRequest; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, @@ -24,78 +21,10 @@ use crate::error::{ UnsupportedExprSnafu, }; use crate::instance::Instance; -use crate::server::grpc::handler::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; use crate::server::grpc::plan::PhysicalPlanner; use crate::server::grpc::select::to_object_result; -use crate::sql::SqlRequest; impl Instance { - async fn add_new_columns_to_table( - &self, - table_name: &str, - add_columns: Vec, - ) -> Result<()> { - let column_names = add_columns - .iter() - .map(|req| req.column_schema.name.clone()) - .collect::>(); - - let alter_request = common_insert::build_alter_table_request(table_name, add_columns); - - debug!( - "Adding new columns: {:?} to table: {}", - column_names, table_name - ); - - let _result = self - .sql_handler() - .execute(SqlRequest::Alter(alter_request)) - .await?; - - info!( - "Added new columns: {:?} to table: {}", - column_names, table_name - ); - Ok(()) - } - - async fn create_table_by_insert_batches( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - insert_batches: &[InsertBatch], - ) -> Result<()> { - // Create table automatically, build schema from data. - let table_id = self - .catalog_manager - .next_table_id() - .await - .context(CatalogSnafu)?; - let create_table_request = common_insert::build_create_table_request( - catalog_name, - schema_name, - table_id, - table_name, - insert_batches, - ) - .context(InsertDataSnafu)?; - - info!( - "Try to create table: {} automatically with request: {:?}", - table_name, create_table_request, - ); - - let _result = self - .sql_handler() - .execute(SqlRequest::CreateTable(create_table_request)) - .await?; - - info!("Success to create table: {} automatically", table_name); - - Ok(()) - } - pub async fn execute_grpc_insert( &self, catalog_name: &str, @@ -113,34 +42,14 @@ impl Instance { .context(SchemaNotFoundSnafu { name: schema_name })?; let insert_batches = - common_insert::insert_batches(values.values).context(InsertDataSnafu)?; + common_insert::insert_batches(&values.values).context(InsertDataSnafu)?; ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu); - let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? { - let schema = table.schema(); - if let Some(add_columns) = common_insert::find_new_columns(&schema, &insert_batches) - .context(InsertDataSnafu)? - { - self.add_new_columns_to_table(table_name, add_columns) - .await?; - } - - table - } else { - self.create_table_by_insert_batches( - catalog_name, - schema_name, - table_name, - &insert_batches, - ) - .await?; - - schema_provider - .table(table_name) - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name })? - }; + let table = schema_provider + .table(table_name) + .context(CatalogSnafu)? + .context(TableNotFoundSnafu { table_name })?; let insert = insertion_expr_to_request( catalog_name, @@ -253,16 +162,8 @@ impl GrpcQueryHandler for Instance { reason: "missing `expr` in `InsertExpr`", })?; - // TODO(fys): _region_id is for later use. - let _region_id: Option = insert_expr - .options - .get("region_id") - .map(|id| { - id.deref() - .try_into() - .context(servers::error::DecodeRegionNumberSnafu) - }) - .transpose()?; + // TODO(fys): _region_number is for later use. + let _region_number: u32 = insert_expr.region_number; match expr { insert_expr::Expr::Values(values) => { diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 0e770bf371..470a8134bc 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -11,7 +11,8 @@ use sql::statements::statement::Statement; use table::requests::CreateDatabaseRequest; use crate::error::{ - CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, + BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result, + SchemaNotFoundSnafu, TableIdProviderNotFoundSnafu, }; use crate::instance::Instance; use crate::metric; @@ -67,10 +68,12 @@ impl Instance { Statement::CreateTable(c) => { let table_id = self - .catalog_manager + .table_id_provider + .as_ref() + .context(TableIdProviderNotFoundSnafu)? .next_table_id() .await - .context(CatalogSnafu)?; + .context(BumpTableIdSnafu)?; let _engine_name = c.engine.clone(); // TODO(hl): Select table engine by engine_name diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 7b2eb6ee36..edef85601a 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -1,11 +1,15 @@ +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use catalog::remote::MetaKvBackend; +use common_catalog::consts::MIN_USER_TABLE_ID; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_srv::mocks::MockInfo; use query::QueryEngineFactory; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; +use table::metadata::TableId; +use table::table::{TableIdProvider, TableIdProviderRef}; use table_engine::config::EngineConfig as TableEngineConfig; use crate::datanode::DatanodeOptions; @@ -53,6 +57,8 @@ impl Instance { "127.0.0.1:3302".to_string(), meta_client.as_ref().unwrap().clone(), )); + + let table_id_provider = Some(catalog_manager.clone() as TableIdProviderRef); Ok(Self { query_engine, sql_handler, @@ -61,6 +67,7 @@ impl Instance { script_executor, meta_client, heartbeat_task, + table_id_provider, }) } @@ -105,12 +112,32 @@ impl Instance { catalog_manager, physical_planner: PhysicalPlanner::new(query_engine), script_executor, + table_id_provider: Some(Arc::new(LocalTableIdProvider::default())), meta_client: Some(meta_client), heartbeat_task: Some(heartbeat_task), }) } } +struct LocalTableIdProvider { + inner: Arc, +} + +impl Default for LocalTableIdProvider { + fn default() -> Self { + Self { + inner: Arc::new(AtomicU32::new(MIN_USER_TABLE_ID)), + } + } +} + +#[async_trait::async_trait] +impl TableIdProvider for LocalTableIdProvider { + async fn next_table_id(&self) -> table::Result { + Ok(self.inner.fetch_add(1, Ordering::Relaxed)) + } +} + async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient { let MockInfo { server_addr, diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 4ab6a7ff32..e15811c728 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -1,28 +1,21 @@ -pub mod grpc; - +use std::default::Default; use std::net::SocketAddr; use std::sync::Arc; -use common_error::prelude::BoxedError; use common_runtime::Builder as RuntimeBuilder; -use common_telemetry::info; -use frontend::frontend::{Frontend, FrontendOptions, Mode}; -use frontend::instance::Instance as FrontendInstanceImpl; use servers::grpc::GrpcServer; use servers::server::Server; use snafu::ResultExt; -use tokio::try_join; use crate::datanode::DatanodeOptions; -use crate::error::{ - BuildFrontendSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu, -}; +use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu}; use crate::instance::InstanceRef; +pub mod grpc; + /// All rpc services. pub struct Services { grpc_server: GrpcServer, - frontend: Option>, } impl Services { @@ -35,54 +28,19 @@ impl Services { .context(RuntimeResourceSnafu)?, ); - let frontend = match opts.mode { - Mode::Standalone => Some(Self::build_frontend(opts).await?), - Mode::Distributed => { - info!("Starting datanode in distributed mode, only gRPC server will be started."); - None - } - }; Ok(Self { - grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime), - frontend, + grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime), }) } - /// Build frontend instance in standalone mode - async fn build_frontend(opts: &DatanodeOptions) -> Result> { - let grpc_server_addr = &opts.rpc_addr; - info!( - "Build frontend with datanode gRPC addr: {}", - grpc_server_addr - ); - let options = FrontendOptions { - mode: Mode::Standalone, - datanode_rpc_addr: grpc_server_addr.clone(), - ..Default::default() - }; - let frontend_instance = FrontendInstanceImpl::try_new(&options) - .await - .context(BuildFrontendSnafu)?; - Ok(Frontend::new(options, frontend_instance)) - } - pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, })?; - - try_join!(self.grpc_server.start(grpc_addr), async { - if let Some(ref mut frontend_instance) = self.frontend { - info!("Starting frontend instance"); - frontend_instance - .start() - .await - .map_err(BoxedError::new) - .context(servers::error::StartFrontendSnafu)?; - } - Ok(()) - }) - .context(StartServerSnafu)?; + self.grpc_server + .start(grpc_addr) + .await + .context(StartServerSnafu)?; Ok(()) } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 7805b0093f..13988d2c7d 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,4 +1,3 @@ mod ddl; -pub(crate) mod handler; pub(crate) mod plan; pub mod select; diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index b675424989..735874651e 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -1,24 +1,70 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; +use api::result::AdminResultBuilder; use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{ErrorExt, StatusCode}; use common_query::Output; +use common_telemetry::{error, info}; use datatypes::schema::ColumnDefaultConstraint; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use snafu::prelude::*; +use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{self, CatalogSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result}; +use crate::error::{ + self, BumpTableIdSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result, +}; use crate::instance::Instance; -use crate::server::grpc::handler::AdminResultBuilder; use crate::sql::SqlRequest; impl Instance { + /// Handle gRPC create table requests. pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { - let request = self.create_expr_to_request(expr).await; + // Respect CreateExpr's table id and region ids if present, or allocate table id + // from local table id provider and set region id to 0. + let table_id = if let Some(table_id) = expr.table_id { + info!( + "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", + expr.catalog_name, expr.schema_name, expr.table_name, table_id + ); + table_id + } else { + match self.table_id_provider.as_ref() { + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Internal as u32) + .err_msg("Table id provider absent in standalone mode".to_string()) + .build(); + } + Some(table_id_provider) => { + match table_id_provider + .next_table_id() + .await + .context(BumpTableIdSnafu) + { + Ok(table_id) => { + info!( + "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", + &expr.catalog_name, &expr.schema_name, expr.table_name, table_id + ); + table_id + } + Err(e) => { + error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); + return AdminResultBuilder::default() + .status_code(e.status_code() as u32) + .err_msg(e.to_string()) + .build(); + } + } + } + } + }; + + let request = create_expr_to_request(table_id, expr).await; let result = futures::future::ready(request) .and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request))) .await; @@ -37,7 +83,7 @@ impl Instance { } pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { - let request = match self.alter_expr_to_request(expr).transpose() { + let request = match alter_expr_to_request(expr).transpose() { Some(req) => req, None => { return AdminResultBuilder::default() @@ -62,77 +108,76 @@ impl Instance { .build(), } } +} - async fn create_expr_to_request(&self, expr: CreateExpr) -> Result { - let schema = create_table_schema(&expr)?; - - let primary_key_indices = expr - .primary_keys - .iter() - .map(|key| { - schema - .column_index_by_name(key) - .context(error::KeyColumnNotFoundSnafu { name: key }) - }) - .collect::>>()?; - - let catalog_name = expr - .catalog_name - .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); - let schema_name = expr - .schema_name - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); - - let table_id = self - .catalog_manager() - .next_table_id() - .await - .context(CatalogSnafu)?; - - let region_id = expr - .table_options - .get(&"region_id".to_string()) - .unwrap() - .parse::() - .unwrap(); - - Ok(CreateTableRequest { - id: table_id, - catalog_name, - schema_name, - table_name: expr.table_name, - desc: expr.desc, - schema, - region_numbers: vec![region_id], - primary_key_indices, - create_if_not_exists: expr.create_if_not_exists, - table_options: expr.table_options, +async fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result { + let schema = create_table_schema(&expr)?; + let primary_key_indices = expr + .primary_keys + .iter() + .map(|key| { + schema + .column_index_by_name(key) + .context(error::KeyColumnNotFoundSnafu { name: key }) }) - } + .collect::>>()?; - fn alter_expr_to_request(&self, expr: AlterExpr) -> Result> { - match expr.kind { - Some(Kind::AddColumn(add_column)) => { - let column_def = add_column.column_def.context(MissingFieldSnafu { + let catalog_name = expr + .catalog_name + .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); + let schema_name = expr + .schema_name + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + + let region_ids = if expr.region_ids.is_empty() { + vec![0] + } else { + expr.region_ids + }; + + Ok(CreateTableRequest { + id: table_id, + catalog_name, + schema_name, + table_name: expr.table_name, + desc: expr.desc, + schema, + region_numbers: region_ids, + primary_key_indices, + create_if_not_exists: expr.create_if_not_exists, + table_options: expr.table_options, + }) +} + +fn alter_expr_to_request(expr: AlterExpr) -> Result> { + match expr.kind { + Some(Kind::AddColumns(add_columns)) => { + let mut add_column_requests = vec![]; + for add_column_expr in add_columns.add_columns { + let column_def = add_column_expr.column_def.context(MissingFieldSnafu { field: "column_def", })?; - let alter_kind = AlterKind::AddColumns { - columns: vec![AddColumnRequest { - column_schema: create_column_schema(&column_def)?, - // FIXME(dennis): supports adding key column - is_key: false, - }], - }; - let request = AlterTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - alter_kind, - }; - Ok(Some(request)) + + let schema = create_column_schema(&column_def)?; + add_column_requests.push(AddColumnRequest { + column_schema: schema, + is_key: add_column_expr.is_key, + }) } - None => Ok(None), + + let alter_kind = AlterKind::AddColumns { + columns: add_column_requests, + }; + + let request = AlterTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + alter_kind, + }; + Ok(Some(request)) } + None => Ok(None), } } @@ -191,8 +236,7 @@ fn create_column_schema(column_def: &ColumnDef) -> Result { #[cfg(test)] mod tests { - use std::collections::HashMap; - + use common_catalog::consts::MIN_USER_TABLE_ID; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; @@ -207,7 +251,7 @@ mod tests { instance.start().await.unwrap(); let expr = testing_create_expr(); - let request = instance.create_expr_to_request(expr).await.unwrap(); + let request = create_expr_to_request(1024, expr).await.unwrap(); assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); @@ -219,7 +263,7 @@ mod tests { let mut expr = testing_create_expr(); expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = instance.create_expr_to_request(expr).await; + let result = create_expr_to_request(1025, expr).await; assert!(result.is_err()); assert!(result .unwrap_err() @@ -312,9 +356,6 @@ mod tests { default_constraint: None, }, ]; - let table_options = [("region_id".to_string(), "0".to_string())] - .into_iter() - .collect::>(); CreateExpr { catalog_name: None, schema_name: None, @@ -324,7 +365,9 @@ mod tests { time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, - table_options, + table_options: Default::default(), + table_id: Some(MIN_USER_TABLE_ID), + region_ids: vec![0], } } diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 5edf646ade..5aa5f412e2 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; +use api::result::{build_err_result, ObjectResultBuilder}; use api::v1::{codec::SelectResult, column::SemanticType, column::Values, Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; use common_base::BitVec; @@ -12,7 +13,6 @@ use datatypes::schema::SchemaRef; use snafu::{OptionExt, ResultExt}; use crate::error::{self, ConversionSnafu, Result}; -use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; pub async fn to_object_result(output: Result) -> ObjectResult { let result = match output { diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 99cb619514..98a1bf1621 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -5,6 +5,7 @@ use catalog::{RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_telemetry::tracing::info; +use common_telemetry::tracing::log::error; use datatypes::schema::SchemaBuilder; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::TableConstraint; @@ -16,9 +17,9 @@ use table::metadata::TableId; use table::requests::*; use crate::error::{ - self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu, - InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, - Result, + self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateSchemaSnafu, + CreateTableSnafu, InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, + RegisterSchemaSnafu, Result, SchemaNotFoundSnafu, }; use crate::sql::SqlHandler; @@ -40,10 +41,36 @@ impl SqlHandler { pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result { let ctx = EngineContext {}; + // first check if catalog and schema exist + let catalog = self + .catalog_manager + .catalog(&req.catalog_name) + .context(CatalogSnafu)? + .with_context(|| { + error!( + "Failed to create table {}.{}.{}, catalog not found", + &req.catalog_name, &req.schema_name, &req.table_name + ); + CatalogNotFoundSnafu { + name: &req.catalog_name, + } + })?; + catalog + .schema(&req.schema_name) + .context(CatalogSnafu)? + .with_context(|| { + error!( + "Failed to create table {}.{}.{}, schema not found", + &req.catalog_name, &req.schema_name, &req.table_name + ); + SchemaNotFoundSnafu { + name: &req.schema_name, + } + })?; + // determine catalog and schema from the very beginning let table_name = req.table_name.clone(); let table_id = req.id; - let table = self .table_engine .create_table(&ctx, req) diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 3f54f78235..14e05050bd 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -4,30 +4,37 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use api::v1::ColumnDataType; use api::v1::{ admin_result, alter_expr::Kind, codec::InsertBatch, column, column::SemanticType, insert_expr, AddColumn, AlterExpr, Column, ColumnDef, CreateExpr, InsertExpr, MutateResult, }; +use api::v1::{AddColumns, ColumnDataType}; use client::admin::Admin; use client::{Client, Database, ObjectResult}; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_runtime::Builder as RuntimeBuilder; +use frontend::frontend::FrontendOptions; +use frontend::frontend::Mode::Standalone; +use frontend::grpc::GrpcOptions; use servers::grpc::GrpcServer; use servers::server::Server; use crate::instance::Instance; use crate::tests::test_util::{self, TestGuard}; -async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc) { +async fn setup_grpc_server( + name: &str, + port: usize, +) -> (String, TestGuard, Arc, Arc) { common_telemetry::init_default_ut_logging(); let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let addr = format!("127.0.0.1:{}", port); - opts.rpc_addr = addr.clone(); + let datanode_grpc_addr = format!("127.0.0.1:{}", port); + opts.rpc_addr = datanode_grpc_addr.clone(); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); - let addr_cloned = addr.clone(); + let datanode_grpc_addr = datanode_grpc_addr.clone(); let runtime = Arc::new( RuntimeBuilder::default() .worker_threads(2) @@ -36,30 +43,65 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc().unwrap(); + let addr = fe_grpc_addr_clone.parse::().unwrap(); grpc_server_clone.start(addr).await.unwrap() }); + let dn_grpc_addr_clone = datanode_grpc_addr.clone(); + let dn_grpc_server_clone = datanode_grpc_server.clone(); + tokio::spawn(async move { + let addr = dn_grpc_addr_clone.parse::().unwrap(); + dn_grpc_server_clone.start(addr).await.unwrap() + }); + // wait for GRPC server to start tokio::time::sleep(Duration::from_secs(1)).await; - (addr, guard, grpc_server) + (fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server) } #[tokio::test(flavor = "multi_thread")] async fn test_auto_create_table() { - let (addr, _guard, grpc_server) = setup_grpc_server("auto_create_table", 3991).await; + let (addr, _guard, fe_grpc_server, dn_grpc_server) = + setup_grpc_server("auto_create_table", 3991).await; let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client); - insert_and_assert(&db).await; - - grpc_server.shutdown().await.unwrap(); + let _ = fe_grpc_server.shutdown().await; + let _ = dn_grpc_server.shutdown().await; } fn expect_data() -> (Column, Column, Column, Column) { @@ -120,7 +162,8 @@ fn expect_data() -> (Column, Column, Column, Column) { #[ignore] async fn test_insert_and_select() { common_telemetry::init_default_ut_logging(); - let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await; + let (addr, _guard, fe_grpc_server, dn_grpc_server) = + setup_grpc_server("insert_and_select", 3990).await; let grpc_client = Client::with_urls(vec![addr]); @@ -145,8 +188,11 @@ async fn test_insert_and_select() { is_nullable: true, default_constraint: None, }; - let kind = Kind::AddColumn(AddColumn { - column_def: Some(add_column), + let kind = Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(add_column), + is_key: false, + }], }); let expr = AlterExpr { table_name: "test_table".to_string(), @@ -160,7 +206,8 @@ async fn test_insert_and_select() { // insert insert_and_assert(&db).await; - grpc_server.shutdown().await.unwrap(); + let _ = fe_grpc_server.shutdown().await; + let _ = dn_grpc_server.shutdown().await; } async fn insert_and_assert(db: &Database) { @@ -178,12 +225,14 @@ async fn insert_and_assert(db: &Database) { } .into()]; let expr = InsertExpr { + schema_name: "public".to_string(), table_name: "demo".to_string(), expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), options: HashMap::default(), + region_number: 0, }; let result = db.insert(expr).await; - assert!(result.is_ok()); + result.unwrap(); // select let result = db @@ -249,6 +298,8 @@ fn testing_create_expr() -> CreateExpr { time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, - table_options: HashMap::from([("region_id".to_string(), "0".to_string())]), + table_options: Default::default(), + table_id: Some(MIN_USER_TABLE_ID), + region_ids: vec![0], } } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 268a686de4..934d1fe027 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use frontend::frontend::Mode; use snafu::ResultExt; use table::engine::EngineContext; use table::engine::TableEngineRef; @@ -32,6 +33,7 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, + mode: Mode::Standalone, ..Default::default() }; ( diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 57eaf38cbc..998e9adea6 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -18,12 +18,14 @@ common-catalog = { path = "../common/catalog" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +common-insert = { path = "../common/insert" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } futures = "0.3" +futures-util = "0.3" itertools = "0.10" meta-client = { path = "../meta-client" } moka = { version = "0.9", features = ["future"] } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 4a8874428d..0a22365b95 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -7,7 +7,8 @@ use catalog::error::{ }; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ - CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, + RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; use futures::StreamExt; @@ -41,6 +42,45 @@ impl FrontendCatalogManager { } } +// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting +// as soon as it's stable: https://github.com/rust-lang/rust/issues/65991 +#[async_trait::async_trait] +impl CatalogManager for FrontendCatalogManager { + async fn start(&self) -> catalog::error::Result<()> { + Ok(()) + } + + async fn register_table( + &self, + _request: RegisterTableRequest, + ) -> catalog::error::Result { + unimplemented!() + } + + async fn register_schema( + &self, + _request: RegisterSchemaRequest, + ) -> catalog::error::Result { + unimplemented!() + } + + async fn register_system_table( + &self, + _request: RegisterSystemTableRequest, + ) -> catalog::error::Result<()> { + unimplemented!() + } + + fn table( + &self, + _catalog: &str, + _schema: &str, + _table_name: &str, + ) -> catalog::error::Result> { + unimplemented!() + } +} + impl CatalogList for FrontendCatalogManager { fn as_any(&self) -> &dyn Any { self diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 63b9a2055f..e0faeb4806 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -121,7 +121,7 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Invaild InsertRequest, reason: {}", reason))] + #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, backtrace: Backtrace, @@ -194,6 +194,66 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to bump table id when creating table, source: {}", source))] + BumpTableId { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to create table, source: {}", source))] + CreateTable { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to alter table, source: {}", source))] + AlterTable { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to insert values to table, source: {}", source))] + Insert { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to select from table, source: {}", source))] + Select { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to create table on insertion, source: {}", source))] + CreateTableOnInsertion { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to alter table on insertion, source: {}", source))] + AlterTableOnInsertion { + #[snafu(backtrace)] + source: client::Error, + }, + + #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] + BuildCreateExprOnInsertion { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, + + #[snafu(display("Failed to find new columns on insertion: {}", source))] + FindNewColumnsOnInsertion { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, + + #[snafu(display("Failed to deserialize insert batching: {}", source))] + DeserializeInsertBatch { + #[snafu(backtrace)] + source: common_insert::error::Error, + }, + #[snafu(display("Failed to find catalog by name: {}", catalog_name))] CatalogNotFound { catalog_name: String, @@ -266,13 +326,23 @@ impl ErrorExt for Error { Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - Error::JoinTask { .. } - | Error::SchemaNotFound { .. } - | Error::CatalogNotFound { .. } => StatusCode::Unexpected, + Error::JoinTask { .. } => StatusCode::Unexpected, Error::Catalog { source, .. } => source.status_code(), Error::ParseCatalogEntry { source, .. } => source.status_code(), Error::RequestMeta { source } => source.status_code(), + Error::BumpTableId { source, .. } => source.status_code(), + Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, + Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, + Error::CreateTable { source, .. } => source.status_code(), + Error::AlterTable { source, .. } => source.status_code(), + Error::Insert { source, .. } => source.status_code(), + Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(), + Error::CreateTableOnInsertion { source, .. } => source.status_code(), + Error::AlterTableOnInsertion { source, .. } => source.status_code(), + Error::Select { source, .. } => source.status_code(), + Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), + Error::DeserializeInsertBatch { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index d69b69f2c2..8b66af9072 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -7,18 +7,24 @@ use std::sync::Arc; use std::time::Duration; use api::helper::ColumnDataTypeWrapper; +use api::result::ObjectResultBuilder; +use api::v1::alter_expr::Kind; +use api::v1::codec::InsertBatch; +use api::v1::object_expr::Expr; use api::v1::{ - insert_expr, AdminExpr, AdminResult, AlterExpr, ColumnDataType, ColumnDef as GrpcColumnDef, - CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, + insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, ColumnDataType, + ColumnDef as GrpcColumnDef, CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, + ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; -use catalog::{CatalogList, CatalogProviderRef, SchemaProviderRef}; +use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; use client::admin::{admin_result_to_output, Admin}; use client::{Client, Database, Select}; -use common_error::prelude::BoxedError; +use common_error::prelude::{BoxedError, StatusCode}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; +use common_telemetry::{debug, error, info}; use datatypes::schema::ColumnSchema; use meta_client::client::MetaClientBuilder; use meta_client::MetaClientOpts; @@ -34,10 +40,16 @@ use sql::statements::insert::Insert; use sql::statements::statement::Statement; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; +use table::table::TableIdProviderRef; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; +use crate::error::{ + self, AlterTableOnInsertionSnafu, AlterTableSnafu, BuildCreateExprOnInsertionSnafu, + BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ConvertColumnDefaultConstraintSnafu, + CreateTableOnInsertionSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu, + FindNewColumnsOnInsertionSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, SelectSnafu, +}; use crate::frontend::{FrontendOptions, Mode}; use crate::sql::insert_to_request; use crate::table::route::TableRoutes; @@ -65,7 +77,10 @@ pub struct Instance { // But in distribute mode, frontend should fetch datanodes' addresses from metasrv. client: Client, /// catalog manager is None in standalone mode, datanode will keep their own - catalog_manager: Option, + catalog_manager: Option, + /// Table id provider, in standalone mode is left to None, but in distributed mode, + /// table id should be generated by metasrv. + table_id_provider: Option, // TODO(fys): it should be a trait that corresponds to two implementations: // Standalone and Distributed, then the code behind it doesn't need to use so // many match statements. @@ -77,6 +92,7 @@ impl Default for Instance { Self { client: Client::default(), catalog_manager: None, + table_id_provider: None, mode: Mode::Standalone, } } @@ -116,7 +132,7 @@ impl Instance { let datanode_clients = Arc::new(DatanodeClients::new()); let catalog_manager = FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients); - Some(catalog_manager) + Some(Arc::new(catalog_manager)) } else { None }; @@ -133,6 +149,276 @@ impl Instance { Admin::new("greptime", self.client.clone()) } + pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) { + self.catalog_manager = Some(catalog_manager); + } + + pub async fn handle_select(&self, expr: Select) -> Result { + self.database() + .select(expr) + .await + .and_then(Output::try_from) + .context(SelectSnafu) + } + + /// Convert `CreateTable` statement to `CreateExpr` gRPC request. + async fn create_to_expr(&self, create: CreateTable) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; + + let table_id = match &self.table_id_provider { + Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?), + None => None, + }; + // FIXME(hl): Region id should be generated from metasrv + let region_ids = vec![0]; + + let time_index = find_time_index(&create.constraints)?; + let expr = CreateExpr { + catalog_name: Some(catalog_name), + schema_name: Some(schema_name), + table_name, + desc: None, + column_defs: columns_to_expr(&create.columns, &time_index)?, + time_index, + primary_keys: find_primary_keys(&create.constraints)?, + create_if_not_exists: create.if_not_exists, + // TODO(LFC): Fill in other table options. + table_options: HashMap::from([("engine".to_string(), create.engine)]), + table_id, + region_ids, + }; + Ok(expr) + } + + /// Handle create expr. + pub async fn handle_create_table(&self, expr: CreateExpr) -> Result { + let result = self.admin().create(expr.clone()).await; + if let Err(e) = &result { + error!(e; "Failed to create table by expr: {:?}", expr); + } + result + .and_then(admin_result_to_output) + .context(CreateTableSnafu) + } + + /// Handle create database expr. + pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { + self.admin() + .create_database(expr) + .await + .and_then(admin_result_to_output) + .context(CreateTableSnafu) + } + + /// Handle alter expr + pub async fn handle_alter(&self, expr: AlterExpr) -> Result { + self.admin() + .alter(expr) + .await + .and_then(admin_result_to_output) + .context(AlterTableSnafu) + } + + /// Handle batch inserts + pub async fn handle_inserts(&self, insert_expr: &[InsertExpr]) -> Result { + let mut success = 0; + for expr in insert_expr { + match self.handle_insert(expr).await? { + Output::AffectedRows(rows) => success += rows, + _ => unreachable!("Insert should not yield output other than AffectedRows"), + } + } + Ok(Output::AffectedRows(success)) + } + + /// Handle insert. for 'values' insertion, create/alter the destination table on demand. + pub async fn handle_insert(&self, insert_expr: &InsertExpr) -> Result { + let table_name = &insert_expr.table_name; + let catalog_name = "greptime"; + let schema_name = "public"; + + if let Some(expr) = &insert_expr.expr { + match expr { + api::v1::insert_expr::Expr::Values(values) => { + // TODO(hl): gRPC should also support partitioning. + let region_number = 0; + self.handle_insert_values( + catalog_name, + schema_name, + table_name, + region_number, + values, + ) + .await + } + api::v1::insert_expr::Expr::Sql(_) => { + // Frontend does not comprehend insert request that is raw SQL string + self.database() + .insert(insert_expr.clone()) + .await + .and_then(Output::try_from) + .context(InsertSnafu) + } + } + } else { + // expr is empty + Ok(Output::AffectedRows(0)) + } + } + + /// Handle insert requests in frontend + /// If insert is SQL string flavor, just forward to datanode + /// If insert is parsed InsertExpr, frontend should comprehend the schema and create/alter table on demand. + pub async fn handle_insert_values( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region_number: u32, + values: &insert_expr::Values, + ) -> Result { + let insert_batches = + common_insert::insert_batches(&values.values).context(DeserializeInsertBatchSnafu)?; + self.create_or_alter_table_on_demand( + catalog_name, + schema_name, + table_name, + &insert_batches, + ) + .await?; + self.database() + .insert(InsertExpr { + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + region_number, + options: Default::default(), + expr: Some(insert_expr::Expr::Values(values.clone())), + }) + .await + .and_then(Output::try_from) + .context(InsertSnafu) + } + + // check if table already exist: + // - if table does not exist, create table by inferred CreateExpr + // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + async fn create_or_alter_table_on_demand( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + insert_batches: &[InsertBatch], + ) -> Result<()> { + match self + .catalog_manager + .as_ref() + .expect("catalog manager cannot be None") + .catalog(catalog_name) + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { catalog_name })? + .schema(schema_name) + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { + schema_info: schema_name, + })? + .table(table_name) + .context(CatalogSnafu)? + { + None => { + info!( + "Table {}.{}.{} does not exist, try create table", + catalog_name, schema_name, table_name, + ); + self.create_table_by_insert_batches( + catalog_name, + schema_name, + table_name, + insert_batches, + ) + .await?; + info!( + "Successfully created table on insertion: {}.{}.{}", + catalog_name, schema_name, table_name + ); + } + Some(table) => { + let schema = table.schema(); + if let Some(add_columns) = common_insert::find_new_columns(&schema, insert_batches) + .context(FindNewColumnsOnInsertionSnafu)? + { + info!( + "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", + add_columns, catalog_name, schema_name, table_name + ); + self.add_new_columns_to_table(table_name, add_columns) + .await?; + info!( + "Successfully altered table on insertion: {}.{}.{}", + catalog_name, schema_name, table_name + ); + } + } + }; + Ok(()) + } + + /// Infer create table expr from inserting data + async fn create_table_by_insert_batches( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + insert_batches: &[InsertBatch], + ) -> Result { + // Create table automatically, build schema from data. + let table_id = match &self.table_id_provider { + Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?), + None => None, + }; + + let create_expr = common_insert::build_create_expr_from_insertion( + catalog_name, + schema_name, + table_id, + table_name, + insert_batches, + ) + .context(BuildCreateExprOnInsertionSnafu)?; + + info!( + "Try to create table: {} automatically with request: {:?}", + table_name, create_expr, + ); + self.admin() + .create(create_expr) + .await + .and_then(admin_result_to_output) + .context(CreateTableOnInsertionSnafu) + } + + async fn add_new_columns_to_table( + &self, + table_name: &str, + add_columns: AddColumns, + ) -> Result { + debug!( + "Adding new columns: {:?} to table: {}", + add_columns, table_name + ); + let expr = AlterExpr { + table_name: table_name.to_string(), + schema_name: None, + catalog_name: None, + kind: Some(Kind::AddColumns(add_columns)), + }; + self.admin() + .alter(expr) + .await + .and_then(admin_result_to_output) + .context(AlterTableOnInsertionSnafu) + } + fn get_catalog(&self, catalog_name: &str) -> Result { self.catalog_manager .as_ref() @@ -159,6 +445,11 @@ impl Instance { let insert_request = insert_to_request(&schema_provider, *insert)?; + let batch = crate::table::insert::insert_request_to_insert_batch(&insert_request)?; + + self.create_or_alter_table_on_demand(&catalog, &schema, &table, &[batch]) + .await?; + let table = schema_provider .table(&table) .context(error::CatalogSnafu)? @@ -181,10 +472,11 @@ impl FrontendInstance for Instance { #[cfg(test)] impl Instance { - pub fn with_client(client: Client) -> Self { + pub fn with_client_and_catalog_manager(client: Client, catalog: CatalogManagerRef) -> Self { Self { client, - catalog_manager: None, + catalog_manager: Some(catalog), + table_id_provider: None, mode: Mode::Standalone, } } @@ -208,65 +500,58 @@ impl SqlQueryHandler for Instance { match stmt { Statement::Query(_) => self - .database() - .select(Select::Sql(query.to_string())) + .handle_select(Select::Sql(query.to_string())) .await - .and_then(|object_result| object_result.try_into()) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }), - Statement::Insert(insert) => { - match self.mode { - Mode::Standalone => { - // TODO(dennis): respect schema_name when inserting data - let (_catalog_name, _schema_name, table_name) = insert - .full_table_name() - .context(error::ParseSqlSnafu) - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "Failed to get table name", - })?; + Statement::Insert(insert) => match self.mode { + Mode::Standalone => { + let (_, schema_name, table_name) = insert + .full_table_name() + .context(error::ParseSqlSnafu) + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "Failed to get table name", + })?; - let expr = InsertExpr { - table_name, - expr: Some(insert_expr::Expr::Sql(query.to_string())), - options: HashMap::default(), - }; - self.database() - .insert(expr) - .await - .and_then(|object_result| object_result.try_into()) - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) - } - Mode::Distributed => { - let affected = self - .sql_dist_insert(insert) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "execute insert failed", - })?; - Ok(Output::AffectedRows(affected)) - } + let expr = InsertExpr { + schema_name, + table_name, + expr: Some(insert_expr::Expr::Sql(query.to_string())), + region_number: 0, + options: HashMap::default(), + }; + self.handle_insert(&expr) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) } - } + Mode::Distributed => { + let affected = self + .sql_dist_insert(insert) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "execute insert failed", + })?; + Ok(Output::AffectedRows(affected)) + } + }, Statement::CreateTable(create) => { - let expr = create_to_expr(create) + let expr = self + .create_to_expr(create) + .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query })?; - self.admin() - .create(expr) + self.handle_create_table(expr) .await - .and_then(admin_result_to_output) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) } Statement::ShowDatabases(_) | Statement::ShowTables(_) => self - .database() - .select(Select::Sql(query.to_string())) + .handle_select(Select::Sql(query.to_string())) .await - .and_then(|object_result| object_result.try_into()) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }), @@ -274,28 +559,26 @@ impl SqlQueryHandler for Instance { let expr = CreateDatabaseExpr { database_name: c.name.to_string(), }; - self.admin() - .create_database(expr) + self.handle_create_database(expr) .await - .and_then(admin_result_to_output) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) } Statement::Alter(alter_stmt) => self - .admin() - .alter( + .handle_alter( AlterExpr::try_from(alter_stmt) .map_err(BoxedError::new) .context(server_error::ExecuteAlterSnafu { query })?, ) .await - .and_then(admin_result_to_output) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }), Statement::ShowCreateTable(_) => { return server_error::NotSupportedSnafu { feat: query }.fail() } } + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) } async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> { @@ -313,29 +596,6 @@ impl SqlQueryHandler for Instance { } } -fn create_to_expr(create: CreateTable) -> Result { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; - - let time_index = find_time_index(&create.constraints)?; - let expr = CreateExpr { - catalog_name: Some(catalog_name), - schema_name: Some(schema_name), - table_name, - column_defs: columns_to_expr(&create.columns, &time_index)?, - time_index, - primary_keys: find_primary_keys(&create.constraints)?, - create_if_not_exists: create.if_not_exists, - // TODO(LFC): Fill in other table options. - table_options: HashMap::from([ - ("engine".to_string(), create.engine), - ("region_id".to_string(), "0".to_string()), - ]), - ..Default::default() - }; - Ok(expr) -} - fn find_primary_keys(constraints: &[TableConstraint]) -> Result> { let primary_keys = constraints .iter() @@ -421,13 +681,44 @@ fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result server_error::Result { - self.database() - .object(query.clone()) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", query), - }) + if let Some(expr) = &query.expr { + match expr { + Expr::Insert(insert) => { + let result = self.handle_insert(insert).await; + result + .map(|o| match o { + Output::AffectedRows(rows) => ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0u32) + .build(), + _ => { + unreachable!() + } + }) + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }) + } + _ => self + .database() + .object(query.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }), + } + } else { + // why? + self.database() + .object(query.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }) + } } } @@ -617,9 +908,11 @@ mod tests { } .into()]; let insert_expr = InsertExpr { + schema_name: "public".to_string(), table_name: "demo".to_string(), expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), options: HashMap::default(), + region_number: 0, }; let object_expr = ObjectExpr { header: Some(ExprHeader::default()), @@ -708,16 +1001,18 @@ mod tests { default_constraint: None, }, ]; - let mut table_options = HashMap::with_capacity(1); - table_options.insert("region_id".to_string(), "0".to_string()); CreateExpr { + catalog_name: None, + schema_name: None, table_name: "demo".to_string(), + desc: None, column_defs, time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, - table_options, - ..Default::default() + table_options: Default::default(), + table_id: None, + region_ids: vec![0], } } } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index da6de4e715..98bf71d699 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -17,8 +17,7 @@ impl InfluxdbLineProtocolHandler for Instance { match self.mode { Mode::Standalone => { let exprs: Vec = request.try_into()?; - self.database() - .batch_insert(exprs) + self.handle_inserts(&exprs) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { @@ -45,7 +44,14 @@ impl Instance { for insert in inserts { let self_clone = self.clone(); - + let insert_batch = crate::table::insert::insert_request_to_insert_batch(&insert)?; + self.create_or_alter_table_on_demand( + &insert.catalog_name, + &insert.schema_name, + &insert.table_name, + &[insert_batch], + ) + .await?; // TODO(fys): need a separate runtime here let join = tokio::spawn(async move { let catalog = self_clone.get_catalog(&insert.catalog_name)?; diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 1645bc9778..f4e5543e4c 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -1,12 +1,11 @@ use async_trait::async_trait; -use client::ObjectResult; use common_error::prelude::BoxedError; use servers::error as server_error; use servers::opentsdb::codec::DataPoint; use servers::query_handler::OpentsdbProtocolHandler; use snafu::prelude::*; -use crate::error::{self, Result}; +use crate::error::Result; use crate::frontend::Mode; use crate::instance::Instance; @@ -41,27 +40,7 @@ impl OpentsdbProtocolHandler for Instance { impl Instance { async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { let expr = data_point.as_grpc_insert(); - - let result = self.database().insert(expr.clone()).await; - - let object_result = match result { - Ok(result) => result, - Err(_) => { - return Err(result.context(error::RequestDatanodeSnafu).unwrap_err()); - } - }; - - match object_result { - ObjectResult::Mutate(mutate) => { - if mutate.success != 1 || mutate.failure != 0 { - return error::ExecOpentsdbPutSnafu { - reason: format!("illegal result: {:?}", mutate), - } - .fail(); - } - } - ObjectResult::Select(_) => unreachable!(), - } + self.handle_insert(&expr).await?; Ok(()) } } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index f48c9e463f..2f02387b06 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -94,11 +94,15 @@ impl PrometheusProtocolHandler for Instance { match self.mode { Mode::Standalone => { let exprs = prometheus::write_request_to_insert_exprs(request)?; - - self.database() - .batch_insert(exprs) + let futures = exprs + .iter() + .map(|e| self.handle_insert(e)) + .collect::>(); + let res = futures_util::future::join_all(futures) .await - .map_err(BoxedError::new) + .into_iter() + .collect::, crate::error::Error>>(); + res.map_err(BoxedError::new) .context(error::ExecuteInsertSnafu { msg: "failed to write prometheus remote request", })?; @@ -167,6 +171,7 @@ mod tests { #[tokio::test] async fn test_prometheus_remote_write_and_read() { + common_telemetry::init_default_ut_logging(); let instance = tests::create_frontend_instance().await; let write_request = WriteRequest { @@ -174,7 +179,7 @@ mod tests { ..Default::default() }; - assert!(instance.write(write_request).await.is_ok()); + instance.write(write_request).await.unwrap(); let read_request = ReadRequest { queries: vec![ diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 29c382bc15..0efd828b8d 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -145,9 +145,8 @@ async fn start_server( server_and_addr: Option<(Box, SocketAddr)>, ) -> servers::error::Result> { if let Some((server, addr)) = server_and_addr { - let res = server.start(addr).await.map(Some)?; info!("Starting server at {}", addr); - Ok(res) + server.start(addr).await.map(Some) } else { Ok(None) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 65a2a83b84..6c8a8d253d 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -1,4 +1,3 @@ -mod insert; pub(crate) mod route; use std::any::Any; @@ -30,6 +29,7 @@ use crate::mock::{DatanodeInstance, TableScanPlan}; use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; use crate::spliter::WriteSpliter; use crate::table::route::TableRoutes; +pub mod insert; #[derive(Clone)] pub struct DistTable { diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 4bde389066..795237331c 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::codec; +use api::v1::codec::InsertBatch; use api::v1::insert_expr; use api::v1::insert_expr::Expr; use api::v1::Column; @@ -73,12 +74,12 @@ impl DistTable { } } -fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result { +pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result { let mut row_count = None; let columns = insert .columns_values - .into_iter() + .iter() .map(|(column_name, vector)| { match row_count { Some(rows) => ensure!( @@ -97,12 +98,12 @@ fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result>>()?; @@ -111,32 +112,28 @@ fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result Result { + let table_name = insert.table_name.clone(); + let insert_batch = insert_request_to_insert_batch(&insert)?; Ok(InsertExpr { - table_name: insert.table_name, - options, + schema_name: insert.schema_name, + table_name, expr: Some(Expr::Values(insert_expr::Values { values: vec![insert_batch.into()], })), + region_number, + options: Default::default(), }) } #[cfg(test)] mod tests { - use std::{collections::HashMap, ops::Deref}; + use std::collections::HashMap; - use api::v1::{ - codec::{self, InsertBatch}, - insert_expr::Expr, - ColumnDataType, InsertExpr, - }; + use api::v1::{codec::InsertBatch, insert_expr::Expr, ColumnDataType, InsertExpr}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder}; use table::requests::InsertRequest; @@ -205,8 +202,7 @@ mod tests { } } - let bytes = insert_expr.options.get("region_id").unwrap(); - let region_id: codec::RegionNumber = bytes.deref().try_into().unwrap(); - assert_eq!(12, region_id.id); + let region_number = insert_expr.region_number; + assert_eq!(12, region_number); } } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 76a823fb22..5ea03cbb6a 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -19,9 +19,13 @@ async fn create_datanode_instance() -> Arc { } pub(crate) async fn create_frontend_instance() -> Arc { - let datanode_instance = create_datanode_instance().await; + let datanode_instance: Arc = create_datanode_instance().await; + let dn_catalog_manager = datanode_instance.catalog_manager().clone(); let (_, client) = create_datanode_client(datanode_instance).await; - Arc::new(Instance::with_client(client)) + Arc::new(Instance::with_client_and_catalog_manager( + client, + dn_catalog_manager, + )) } pub(crate) async fn create_datanode_client( diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 784a4bc72e..47ced11122 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -4,6 +4,7 @@ use api::v1::{ insert_expr::{self, Expr}, InsertExpr, }; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -80,6 +81,9 @@ impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; fn try_from(value: &InfluxdbRequest) -> Result { + // InfluxDB uses default catalog name and schema name + let schema_name = DEFAULT_SCHEMA_NAME.to_string(); + let mut writers: HashMap = HashMap::new(); let lines = parse_lines(&value.lines) .collect::>>() @@ -150,11 +154,13 @@ impl TryFrom<&InfluxdbRequest> for Vec { Ok(writers .into_iter() .map(|(table_name, writer)| InsertExpr { + schema_name: schema_name.clone(), table_name, expr: Some(Expr::Values(insert_expr::Values { values: vec![writer.finish().into()], })), options: HashMap::default(), + region_number: 0, }) .collect()) } diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 7edbf63e60..aa743324c6 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use api::v1::codec::InsertBatch; use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr}; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; use table::requests::InsertRequest; @@ -133,6 +134,7 @@ impl DataPoint { // TODO(fys): will remove in the future. pub fn as_grpc_insert(&self) -> InsertExpr { + let schema_name = DEFAULT_SCHEMA_NAME.to_string(); let mut columns = Vec::with_capacity(2 + self.tags.len()); let ts_column = Column { @@ -177,11 +179,13 @@ impl DataPoint { row_count: 1, }; InsertExpr { + schema_name, table_name: self.metric.clone(), expr: Some(insert_expr::Expr::Values(insert_expr::Values { values: vec![batch.into()], })), options: HashMap::default(), + region_number: 0, } } diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 8a908a962c..1070aba65e 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -11,6 +11,7 @@ use api::v1::{ codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr, }; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision::MILLISECOND; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; @@ -339,6 +340,8 @@ pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result Result { + let schema_name = DEFAULT_SCHEMA_NAME.to_string(); + // TODO(dennis): save exemplars into a column let labels = std::mem::take(&mut timeseries.labels); let samples = std::mem::take(&mut timeseries.samples); @@ -399,6 +402,7 @@ fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result { row_count: row_count as u32, }; Ok(InsertExpr { + schema_name, table_name: table_name.context(error::InvalidPromRemoteRequestSnafu { msg: "missing '__name__' label in timeseries", })?, @@ -407,6 +411,7 @@ fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result { values: vec![batch.into()], })), options: HashMap::default(), + region_number: 0, }) } diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index ad2a124780..8861fc0298 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -1,4 +1,4 @@ -use api::v1::{alter_expr, AlterExpr}; +use api::v1::{alter_expr, AddColumn, AlterExpr}; use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint}; use crate::error::UnsupportedAlterTableStatementSnafu; @@ -51,8 +51,11 @@ impl TryFrom for AlterExpr { .fail(); } AlterTableOperation::AddColumn { column_def } => { - alter_expr::Kind::AddColumn(api::v1::AddColumn { - column_def: Some(sql_column_def_to_grpc_column_def(column_def)?), + alter_expr::Kind::AddColumns(api::v1::AddColumns { + add_columns: vec![AddColumn { + column_def: Some(sql_column_def_to_grpc_column_def(column_def)?), + is_key: false, + }], }) } }; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 7f3fbe854a..55e74051eb 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -11,7 +11,7 @@ use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; use crate::error::Result; -use crate::metadata::{FilterPushDownType, TableInfoRef, TableType}; +use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; use crate::requests::{AlterTableRequest, InsertRequest}; /// Table abstraction. @@ -61,3 +61,10 @@ pub trait Table: Send + Sync { } pub type TableRef = Arc; + +#[async_trait::async_trait] +pub trait TableIdProvider { + async fn next_table_id(&self) -> Result; +} + +pub type TableIdProviderRef = Arc;