feat: Move create table logic to frontend (#455)

* refactor: dependency, from frontend depends on datanode to datanode depends on frontend

* wip: start frontend in datanode

* wip: migrate create database to frontend

* wip: impl alter table

* fix: CR comments

* feat: add table id and region ids field to CreateExpr

* chore: rebase develop

* refactor: frontend catalog should set from datanode

* feat: gRPC AddColumn request support add multi columns

* wip: move create table and create-on-insertion to frontend

* wip: error handling

* fix: some unit tests

* fix: all unit tests

* chore: merge develop

* feat: add create/alter-on-insertion to dist_insert/sql_dist_insert

* fix: add region number/catalog/schema to InsertExpr

* feat: add handle_create_table/handle_create_database...

* fix: remove catalog from insert expr

* fix: CR comments

* fix: when running in standalone mode, mysql opts and postgres opts should pass to frontend so that auctually running service can change the port to listen on

* refactor: add a standalone subcommand, move frontend start stuff to cmd package

* chore: optimize create table failure logs

* docs: change readme

* docs: update readme
This commit is contained in:
Lei, Huang
2022-11-14 10:54:35 +08:00
committed by GitHub
parent 488eabce4a
commit fae331d2ba
52 changed files with 1356 additions and 721 deletions

2
Cargo.lock generated
View File

@@ -2099,6 +2099,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-insert",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -2110,6 +2111,7 @@ dependencies = [
"datanode",
"datatypes",
"futures",
"futures-util",
"itertools",
"meta-client",
"meta-srv",

View File

@@ -30,30 +30,35 @@ docker build --network host -f docker/Dockerfile -t greptimedb .
## Usage
### Start Datanode
### Start in standalone mode
```
// Start datanode with default options.
cargo run -- datanode start
// Start datanode and frontend with default options.
cargo run -- --log-level=debug standalone start
OR
// Start datanode with `http-addr` option.
cargo run -- datanode start --http-addr=0.0.0.0:9999
// Start with `http-addr` option.
cargo run -- --log-level=debug standalone start --http-addr=0.0.0.0:9999
OR
// Start with `mysql-addr` option.
cargo run -- --log-level=debug standalone start --mysql-addr=0.0.0.0:9999
OR
// Start datanode with `log-dir` and `log-level` options.
cargo run -- --log-dir=logs --log-level=debug datanode start
```
Start datanode with config file:
cargo run -- --log-dir=logs --log-level=debug standalone start --mysql-addr=0.0.0.0:4102
```
cargo run -- --log-dir=logs --log-level=debug datanode start -c ./config/datanode.example.toml
Start with config file:
```
cargo run -- --log-dir=logs --log-level=debug standalone start -c ./config/standalone.example.toml
```
Start datanode by runing docker container:
Start datanode by running docker container:
```
docker run -p 3000:3000 \
@@ -62,46 +67,24 @@ docker run -p 3000:3000 \
greptimedb
```
### Start Frontend
Frontend should connect to Datanode, so **Datanode must have been started** at first!
```
// Connects to local Datanode at its default GRPC port: 3001
// Start Frontend with default options.
cargo run -- frontend start
OR
// Start Frontend with `mysql-addr` option.
cargo run -- frontend start --mysql-addr=0.0.0.0:9999
OR
// Start datanode with `log-dir` and `log-level` options.
cargo run -- --log-dir=logs --log-level=debug frontend start
```
Start datanode with config file:
```
cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/frontend.example.toml
```
### SQL Operations
1. Connecting DB by [mysql client](https://dev.mysql.com/downloads/mysql/):
```
# The datanode listen on port 3306 by default.
mysql -h 127.0.0.1 -P 3306
# The standalone instance listen on port 4002 by default.
mysql -h 127.0.0.1 -P 4002
```
2. Create a database;
```SQL
CREATE DATABASE hello_greptime;
```
2. Create table:
```SQL
CREATE TABLE monitor (
CREATE TABLE hello_greptime.monitor (
host STRING,
ts TIMESTAMP,
cpu DOUBLE DEFAULT 0,
@@ -113,22 +96,22 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten
3. Insert data:
```SQL
INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host1', 66.6, 1024, 1660897955000);
INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host2', 77.7, 2048, 1660897956000);
INSERT INTO monitor(host, cpu, memory, ts) VALUES ('host3', 88.8, 4096, 1660897957000);
INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host1', 66.6, 1024, 1660897955000);
INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host2', 77.7, 2048, 1660897956000);
INSERT INTO hello_greptime.monitor(host, cpu, memory, ts) VALUES ('host3', 88.8, 4096, 1660897957000);
```
4. Query data:
```SQL
mysql> SELECT * FROM monitor;
+-------+---------------+------+--------+
| host | ts | cpu | memory |
+-------+---------------+------+--------+
| host1 | 1660897955000 | 66.6 | 1024 |
| host2 | 1660897956000 | 77.7 | 2048 |
| host3 | 1660897957000 | 88.8 | 4096 |
+-------+---------------+------+--------+
mysql> SELECT * FROM hello_greptime.monitor;
+-------+---------------------+------+--------+
| host | ts | cpu | memory |
+-------+---------------------+------+--------+
| host1 | 2022-08-19 08:32:35 | 66.6 | 1024 |
| host2 | 2022-08-19 08:32:36 | 77.7 | 2048 |
| host3 | 2022-08-19 08:32:37 | 88.8 | 4096 |
+-------+---------------------+------+--------+
3 rows in set (0.01 sec)
```
You can delete your data by removing `/tmp/greptimedb`.

View File

@@ -97,11 +97,13 @@ async fn write_data(
let row_count = record_batch.num_rows();
let insert_batch = convert_record_batch(record_batch).into();
let insert_expr = InsertExpr {
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![insert_batch],
})),
options: HashMap::default(),
region_number: 0,
};
let now = Instant::now();
db.insert(insert_expr).await.unwrap();
@@ -342,6 +344,8 @@ fn create_table_expr() -> CreateExpr {
primary_keys: vec!["VendorID".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
region_ids: vec![0],
table_id: Some(0),
}
}

View File

@@ -0,0 +1,14 @@
mode = 'standalone'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '0.0.0.0:4000'
[grpc_options]
addr = '0.0.0.0:4001'
runtime_size = 4
[mysql_options]
addr = '0.0.0.0:4003'
runtime_size = 4
[influxdb_options]
enable = true

View File

@@ -41,6 +41,8 @@ message CreateExpr {
repeated string primary_keys = 7;
bool create_if_not_exists = 8;
map<string, string> table_options = 9;
optional uint32 table_id = 10;
repeated uint32 region_ids = 11;
}
message AlterExpr {
@@ -48,12 +50,17 @@ message AlterExpr {
optional string schema_name = 2;
string table_name = 3;
oneof kind {
AddColumn add_column = 4;
AddColumns add_columns = 4;
}
}
message AddColumns {
repeated AddColumn add_columns = 1;
}
message AddColumn {
ColumnDef column_def = 1;
bool is_key = 2;
}
message CreateDatabaseExpr {

View File

@@ -38,14 +38,15 @@ message PhysicalPlan {
}
message InsertExpr {
string table_name = 1;
string schema_name = 1;
string table_name = 2;
message Values {
repeated bytes values = 1;
repeated bytes values = 3;
}
oneof expr {
Values values = 2;
Values values = 4;
// TODO(LFC): Remove field "sql" in InsertExpr.
// When Frontend instance received an insertion SQL (`insert into ...`), it's anticipated to parse the SQL and
@@ -54,10 +55,12 @@ message InsertExpr {
// Then why the "sql" field exists here? It's because the Frontend needs table schema to create the values to insert,
// which is currently not able to find anywhere. (Maybe the table schema is suppose to be fetched from Meta?)
// The "sql" field is meant to be removed in the future.
string sql = 3;
string sql = 5;
}
map<string, bytes> options = 4;
/// The region number of current insert request.
uint32 region_number = 6;
map<string, bytes> options = 7;
}
// TODO(jiachun)

View File

@@ -1,6 +1,7 @@
pub mod error;
pub mod helper;
pub mod prometheus;
pub mod result;
pub mod serde;
pub mod v1;

View File

@@ -1,8 +1,9 @@
use api::v1::{
use common_error::prelude::ErrorExt;
use crate::v1::{
admin_result, codec::SelectResult, object_result, AdminResult, MutateResult, ObjectResult,
ResultHeader, SelectResult as SelectResultRaw,
};
use common_error::prelude::ErrorExt;
pub const PROTOCOL_VERSION: u32 = 1;
@@ -10,14 +11,14 @@ pub type Success = u32;
pub type Failure = u32;
#[derive(Default)]
pub(crate) struct ObjectResultBuilder {
pub struct ObjectResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
result: Option<Body>,
}
pub(crate) enum Body {
pub enum Body {
Mutate((Success, Failure)),
Select(SelectResult),
}
@@ -80,7 +81,7 @@ impl ObjectResultBuilder {
}
}
pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
pub fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
ObjectResultBuilder::new()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
@@ -88,7 +89,7 @@ pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
}
#[derive(Debug)]
pub(crate) struct AdminResultBuilder {
pub struct AdminResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
@@ -144,11 +145,11 @@ impl Default for AdminResultBuilder {
#[cfg(test)]
mod tests {
use api::v1::{object_result, MutateResult};
use common_error::status_code::StatusCode;
use super::*;
use crate::error::UnsupportedExprSnafu;
use crate::error::UnknownColumnDataTypeSnafu;
use crate::v1::{object_result, MutateResult};
#[test]
fn test_object_result_builder() {
@@ -175,14 +176,13 @@ mod tests {
#[test]
fn test_build_err_result() {
let err = UnsupportedExprSnafu { name: "select" }.build();
let err = UnknownColumnDataTypeSnafu { datatype: 1 }.build();
let err_result = build_err_result(&err);
let header = err_result.header.unwrap();
let result = err_result.result;
assert_eq!(PROTOCOL_VERSION, header.version);
assert_eq!(StatusCode::Internal as u32, header.code);
assert_eq!("Unsupported expr type: select", header.err_msg);
assert_eq!(StatusCode::InvalidArguments as u32, header.code);
assert!(result.is_none());
}
}

View File

@@ -171,12 +171,6 @@ pub enum Error {
source: meta_client::error::Error,
},
#[snafu(display("Failed to bump table id"))]
BumpTableId { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to parse table id from metasrv, data: {:?}", data))]
ParseTableId { data: String, backtrace: Backtrace },
#[snafu(display("Failed to deserialize partition rule from string: {:?}", data))]
DeserializePartitionRule {
data: String,
@@ -232,9 +226,6 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::BumpTableId { .. } | Error::ParseTableId { .. } => {
StatusCode::StorageUnavailable
}
Error::DeserializePartitionRule { .. } => StatusCode::Unexpected,
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),

View File

@@ -69,9 +69,6 @@ pub trait CatalogManager: CatalogList {
/// Starts a catalog manager.
async fn start(&self) -> Result<()>;
/// Returns next available table id.
async fn next_table_id(&self) -> Result<TableId>;
/// Registers a table given given catalog/schema to catalog manager,
/// returns table registered.
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize>;

View File

@@ -16,6 +16,7 @@ use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::OpenTableRequest;
use table::table::numbers::NumbersTable;
use table::table::TableIdProvider;
use table::TableRef;
use crate::error::{
@@ -278,6 +279,13 @@ impl CatalogList for LocalCatalogManager {
}
}
#[async_trait::async_trait]
impl TableIdProvider for LocalCatalogManager {
async fn next_table_id(&self) -> table::Result<TableId> {
Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed))
}
}
#[async_trait::async_trait]
impl CatalogManager for LocalCatalogManager {
/// Start [LocalCatalogManager] to load all information from system catalog table.
@@ -286,11 +294,6 @@ impl CatalogManager for LocalCatalogManager {
self.init().await
}
#[inline]
async fn next_table_id(&self) -> Result<TableId> {
Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed))
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
let started = self.init_lock.lock().await;

View File

@@ -8,6 +8,7 @@ use std::sync::RwLock;
use common_catalog::consts::MIN_USER_TABLE_ID;
use snafu::OptionExt;
use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
@@ -41,6 +42,13 @@ impl Default for MemoryCatalogManager {
}
}
#[async_trait::async_trait]
impl TableIdProvider for MemoryCatalogManager {
async fn next_table_id(&self) -> table::error::Result<TableId> {
Ok(self.table_id.fetch_add(1, Ordering::Relaxed))
}
}
#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
async fn start(&self) -> Result<()> {
@@ -48,10 +56,6 @@ impl CatalogManager for MemoryCatalogManager {
Ok(())
}
async fn next_table_id(&self) -> Result<TableId> {
Ok(self.table_id.fetch_add(1, Ordering::Relaxed))
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs

View File

@@ -2,18 +2,15 @@ use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use async_stream::stream;
use backoff::exponential::ExponentialBackoffBuilder;
use backoff::ExponentialBackoff;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use futures::Stream;
use futures_util::StreamExt;
use snafu::{OptionExt, ResultExt};
@@ -25,8 +22,8 @@ use table::TableRef;
use tokio::sync::Mutex;
use crate::error::{
BumpTableIdSnafu, CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu,
OpenTableSnafu, ParseTableIdSnafu, SchemaNotFoundSnafu, TableExistsSnafu,
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
SchemaNotFoundSnafu, TableExistsSnafu,
};
use crate::error::{InvalidTableSchemaSnafu, Result};
use crate::remote::{Kv, KvBackendRef};
@@ -378,63 +375,6 @@ impl CatalogManager for RemoteCatalogManager {
Ok(())
}
/// Bump table id in a CAS manner with backoff.
async fn next_table_id(&self) -> Result<TableId> {
let key = common_catalog::consts::TABLE_ID_KEY_PREFIX.as_bytes();
let op = || async {
// TODO(hl): optimize this get
let (prev, prev_bytes) = match self.backend.get(key).await? {
None => (MIN_USER_TABLE_ID, vec![]),
Some(kv) => (parse_table_id(&kv.1)?, kv.1),
};
match self
.backend
.compare_and_set(key, &prev_bytes, &(prev + 1).to_le_bytes())
.await
{
Ok(cas_res) => match cas_res {
Ok(_) => Ok(prev),
Err(e) => {
info!("Table id {:?} already occupied", e);
Err(backoff::Error::transient(
BumpTableIdSnafu {
msg: "Table id occupied",
}
.build(),
))
}
},
Err(e) => {
error!(e;"Failed to CAS table id");
Err(backoff::Error::permanent(
BumpTableIdSnafu {
msg: format!("Failed to perform CAS operation: {:?}", e),
}
.build(),
))
}
}
};
let retry_policy: ExponentialBackoff = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(4))
.with_multiplier(2.0)
.with_max_interval(Duration::from_millis(1000))
.with_max_elapsed_time(Some(Duration::from_millis(3000)))
.build();
backoff::future::retry(retry_policy, op).await.map_err(|e| {
BumpTableIdSnafu {
msg: format!(
"Bump table id exceeds max fail times, last error msg: {:?}",
e
),
}
.build()
})
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
let catalog_name = request.catalog;
let schema_name = request.schema;
@@ -614,16 +554,6 @@ impl CatalogProvider for RemoteCatalogProvider {
}
}
/// Parse u8 slice to `TableId`
fn parse_table_id(val: &[u8]) -> Result<TableId> {
Ok(TableId::from_le_bytes(val.try_into().map_err(|_| {
ParseTableIdSnafu {
data: format!("{:?}", val),
}
.build()
})?))
}
pub struct RemoteSchemaProvider {
catalog_name: String,
schema_name: String,
@@ -745,17 +675,3 @@ impl SchemaProvider for RemoteSchemaProvider {
Ok(self.tables.load().contains_key(name))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_table_id() {
assert_eq!(12, parse_table_id(&12_i32.to_le_bytes()).unwrap());
let mut data = vec![];
data.extend_from_slice(&12_i32.to_le_bytes());
data.push(0);
assert!(parse_table_id(&data).is_err());
}
}

View File

@@ -11,8 +11,8 @@ mod tests {
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
};
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use datatypes::schema::Schema;
use futures_util::StreamExt;
@@ -61,7 +61,9 @@ mod tests {
);
}
async fn prepare_components(node_id: u64) -> (KvBackendRef, TableEngineRef, CatalogManagerRef) {
async fn prepare_components(
node_id: u64,
) -> (KvBackendRef, TableEngineRef, Arc<RemoteCatalogManager>) {
let backend = Arc::new(MockKvBackend::default()) as KvBackendRef;
let table_engine = Arc::new(MockTableEngine::default());
let catalog_manager =
@@ -277,19 +279,4 @@ mod tests {
new_catalog.schema_names().unwrap().into_iter().collect()
)
}
#[tokio::test]
async fn test_next_table_id() {
let node_id = 42;
let (_, _, catalog_manager) = prepare_components(node_id).await;
assert_eq!(
MIN_USER_TABLE_ID,
catalog_manager.next_table_id().await.unwrap()
);
assert_eq!(
MIN_USER_TABLE_ID + 1,
catalog_manager.next_table_id().await.unwrap()
);
}
}

View File

@@ -15,11 +15,13 @@ async fn run() {
let db = Database::new("greptime", client);
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: insert_batches(),
})),
options: HashMap::default(),
region_number: 0,
};
db.insert(expr).await.unwrap();
}

View File

@@ -49,6 +49,8 @@ async fn run() {
primary_keys: vec!["key".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
table_id: Some(1024),
region_ids: vec![0],
};
let admin = Admin::new("create table", client.clone());

View File

@@ -124,8 +124,6 @@ impl Database {
obj_result.try_into()
}
// TODO(jiachun) update/delete
pub async fn object(&self, expr: ObjectExpr) -> Result<GrpcObjectResult> {
let res = self.objects(vec![expr]).await?.pop().unwrap();
Ok(res)

View File

@@ -1,10 +1,10 @@
use std::fmt;
use clap::Parser;
use cmd::datanode;
use cmd::error::Result;
use cmd::frontend;
use cmd::metasrv;
use cmd::{datanode, standalone};
use common_telemetry::logging::error;
use common_telemetry::logging::info;
@@ -33,6 +33,8 @@ enum SubCommand {
Frontend(frontend::Command),
#[clap(name = "metasrv")]
Metasrv(metasrv::Command),
#[clap(name = "standalone")]
Standalone(standalone::Command),
}
impl SubCommand {
@@ -41,6 +43,7 @@ impl SubCommand {
SubCommand::Datanode(cmd) => cmd.run().await,
SubCommand::Frontend(cmd) => cmd.run().await,
SubCommand::Metasrv(cmd) => cmd.run().await,
SubCommand::Standalone(cmd) => cmd.run().await,
}
}
}
@@ -51,6 +54,7 @@ impl fmt::Display for SubCommand {
SubCommand::Datanode(..) => write!(f, "greptime-datanode"),
SubCommand::Frontend(..) => write!(f, "greptime-frontend"),
SubCommand::Metasrv(..) => write!(f, "greptime-metasrv"),
SubCommand::Standalone(..) => write!(f, "greptime-standalone"),
}
}
}

View File

@@ -11,6 +11,12 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to build frontend, source: {}", source))]
BuildFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},
#[snafu(display("Failed to start frontend, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
@@ -38,6 +44,9 @@ pub enum Error {
#[snafu(display("Missing config, msg: {}", msg))]
MissingConfig { msg: String, backtrace: Backtrace },
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -51,6 +60,8 @@ impl ErrorExt for Error {
Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => {
StatusCode::InvalidArguments
}
Error::IllegalConfig { .. } => StatusCode::InvalidArguments,
Error::BuildFrontend { source, .. } => source.status_code(),
}
}

View File

@@ -2,4 +2,5 @@ pub mod datanode;
pub mod error;
pub mod frontend;
pub mod metasrv;
pub mod standalone;
mod toml_loader;

192
src/cmd/src/standalone.rs Normal file
View File

@@ -0,0 +1,192 @@
use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance as FeInstance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use snafu::ResultExt;
use tokio::try_join;
use crate::error::{
BuildFrontendSnafu, Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::toml_loader;
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
}
impl Command {
pub async fn run(self) -> Result<()> {
self.subcmd.run().await
}
}
#[derive(Parser)]
enum SubCommand {
Start(StartCommand),
}
impl SubCommand {
async fn run(self) -> Result<()> {
match self {
SubCommand::Start(cmd) => cmd.run().await,
}
}
}
#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
rpc_addr: Option<String>,
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
#[clap(short, long)]
influxdb_enable: bool,
#[clap(short, long)]
config_file: Option<String>,
}
impl StartCommand {
async fn run(self) -> Result<()> {
let fe_opts = FrontendOptions::try_from(self)?;
let dn_opts = DatanodeOptions::default();
let mut datanode = Datanode::new(dn_opts.clone())
.await
.context(StartDatanodeSnafu)?;
let mut frontend = build_frontend(fe_opts, &dn_opts, datanode.get_instance()).await?;
try_join!(
async { datanode.start().await.context(StartDatanodeSnafu) },
async { frontend.start().await.context(StartFrontendSnafu) }
)?;
Ok(())
}
}
/// Build frontend instance in standalone mode
async fn build_frontend(
fe_opts: FrontendOptions,
dn_opts: &DatanodeOptions,
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let grpc_server_addr = &dn_opts.rpc_addr;
info!(
"Build frontend with datanode gRPC addr: {}",
grpc_server_addr
);
let mut frontend_instance = FeInstance::try_new(&fe_opts)
.await
.context(BuildFrontendSnafu)?;
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
Ok(Frontend::new(fe_opts, frontend_instance))
}
impl TryFrom<StartCommand> for FrontendOptions {
type Error = Error;
fn try_from(cmd: StartCommand) -> std::result::Result<Self, Self::Error> {
let mut opts: FrontendOptions = if let Some(path) = cmd.config_file {
toml_loader::from_file!(&path)?
} else {
FrontendOptions::default()
};
opts.mode = Mode::Standalone;
if let Some(addr) = cmd.http_addr {
opts.http_addr = Some(addr);
}
if let Some(addr) = cmd.rpc_addr {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().rpc_addr;
if addr == datanode_grpc_addr {
return IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {}",
datanode_grpc_addr
),
}
.fail();
}
opts.grpc_options = Some(GrpcOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.mysql_addr {
opts.mysql_options = Some(MysqlOptions {
addr,
..Default::default()
})
}
if let Some(addr) = cmd.postgres_addr {
opts.postgres_options = Some(PostgresOptions {
addr,
..Default::default()
})
}
if let Some(addr) = cmd.opentsdb_addr {
opts.opentsdb_options = Some(OpentsdbOptions {
addr,
..Default::default()
});
}
if cmd.influxdb_enable {
opts.influxdb_options = Some(InfluxdbOptions { enable: true });
}
Ok(opts)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_config_file() {
let cmd = StartCommand {
http_addr: None,
rpc_addr: None,
mysql_addr: None,
postgres_addr: None,
opentsdb_addr: None,
config_file: Some(format!(
"{}/../../config/standalone.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
influxdb_enable: false,
};
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(Some("0.0.0.0:4000".to_string()), fe_opts.http_addr);
assert_eq!(
"0.0.0.0:4001".to_string(),
fe_opts.grpc_options.unwrap().addr
);
assert_eq!("0.0.0.0:4003", fe_opts.mysql_options.as_ref().unwrap().addr);
assert_eq!(4, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
}
}

View File

@@ -5,49 +5,46 @@ use std::{
sync::Arc,
};
use api::{
helper::ColumnDataTypeWrapper,
v1::{
codec::InsertBatch,
column::{SemanticType, Values},
Column,
},
use api::v1::{
codec::InsertBatch,
column::{SemanticType, Values},
AddColumns, Column,
};
use api::v1::{AddColumn, ColumnDef, CreateExpr};
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::schema::SchemaRef;
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::{
requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest},
requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest},
Table,
};
use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateSchemaSnafu, DecodeInsertSnafu,
DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, MissingTimestampColumnSnafu, Result,
ColumnNotFoundSnafu, DecodeInsertSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu,
MissingTimestampColumnSnafu, Result,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
#[inline]
fn build_column_schema(column_name: &str, datatype: i32, nullable: bool) -> Result<ColumnSchema> {
let datatype_wrapper = ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?;
Ok(ColumnSchema::new(
column_name,
datatype_wrapper.into(),
nullable,
))
fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnDef {
ColumnDef {
name: column_name.to_string(),
datatype,
is_nullable: nullable,
default_constraint: None,
}
}
pub fn find_new_columns(
schema: &SchemaRef,
insert_batches: &[InsertBatch],
) -> Result<Option<Vec<AddColumnRequest>>> {
let mut requests = Vec::default();
) -> Result<Option<AddColumns>> {
let mut columns_to_add = Vec::default();
let mut new_columns: HashSet<String> = HashSet::default();
for InsertBatch { columns, row_count } in insert_batches {
@@ -65,10 +62,9 @@ pub fn find_new_columns(
if schema.column_schema_by_name(column_name).is_none()
&& !new_columns.contains(column_name)
{
let column_schema = build_column_schema(column_name, *datatype, true)?;
requests.push(AddColumnRequest {
column_schema,
let column_def = Some(build_column_def(column_name, *datatype, true));
columns_to_add.push(AddColumn {
column_def,
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
});
new_columns.insert(column_name.to_string());
@@ -76,10 +72,12 @@ pub fn find_new_columns(
}
}
if requests.is_empty() {
if columns_to_add.is_empty() {
Ok(None)
} else {
Ok(Some(requests))
Ok(Some(AddColumns {
add_columns: columns_to_add,
}))
}
}
@@ -98,15 +96,15 @@ pub fn build_alter_table_request(
}
/// Try to build create table request from insert data.
pub fn build_create_table_request(
pub fn build_create_expr_from_insertion(
catalog_name: &str,
schema_name: &str,
table_id: TableId,
table_id: Option<TableId>,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<CreateTableRequest> {
) -> Result<CreateExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_schemas = Vec::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
let mut timestamp_index = usize::MAX;
@@ -124,9 +122,8 @@ pub fn build_create_table_request(
{
if !new_columns.contains(column_name) {
let mut is_nullable = true;
let mut is_time_index = false;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()),
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()),
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
@@ -135,42 +132,42 @@ pub fn build_create_table_request(
duplicated: column_name,
}
);
timestamp_index = column_schemas.len();
is_time_index = true;
timestamp_index = column_defs.len();
// Timestamp column must not be null.
is_nullable = false;
}
_ => {}
}
let column_schema = build_column_schema(column_name, *datatype, is_nullable)?
.with_time_index(is_time_index);
column_schemas.push(column_schema);
let column_def = build_column_def(column_name, *datatype, is_nullable);
column_defs.push(column_def);
new_columns.insert(column_name.to_string());
}
}
ensure!(timestamp_index != usize::MAX, MissingTimestampColumnSnafu);
let timestamp_field_name = columns[timestamp_index].column_name.clone();
let schema = Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.build()
.context(CreateSchemaSnafu)?,
);
let primary_keys = primary_key_indices
.iter()
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
return Ok(CreateTableRequest {
id: table_id,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
table_name: table_name.to_string(),
desc: None,
schema,
desc: Some("Created on insertion".to_string()),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
primary_key_indices,
table_options: HashMap::new(),
region_numbers: vec![0],
});
table_options: Default::default(),
table_id,
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
return Ok(expr);
}
IllegalInsertDataSnafu.fail()
@@ -233,7 +230,7 @@ pub fn insertion_expr_to_request(
}
#[inline]
pub fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
pub fn insert_batches(bytes_vec: &[Vec<u8>]) -> Result<Vec<InsertBatch>> {
bytes_vec
.iter()
.map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu))
@@ -365,6 +362,7 @@ mod tests {
use std::any::Any;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
codec::InsertBatch,
column::{self, SemanticType, Values},
@@ -379,50 +377,114 @@ mod tests {
schema::{ColumnSchema, SchemaBuilder, SchemaRef},
value::Value,
};
use snafu::ResultExt;
use table::error::Result as TableResult;
use table::metadata::TableInfoRef;
use table::Table;
use super::{
build_column_schema, build_create_table_request, convert_values, find_new_columns,
insert_batches, insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE,
TIMESTAMP_SEMANTIC_TYPE,
build_create_expr_from_insertion, convert_values, find_new_columns, insert_batches,
insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE,
};
use crate::error;
use crate::error::ColumnDataTypeSnafu;
#[inline]
fn build_column_schema(
column_name: &str,
datatype: i32,
nullable: bool,
) -> error::Result<ColumnSchema> {
let datatype_wrapper =
ColumnDataTypeWrapper::try_new(datatype).context(ColumnDataTypeSnafu)?;
Ok(ColumnSchema::new(
column_name,
datatype_wrapper.into(),
nullable,
))
}
#[test]
fn test_build_create_table_request() {
let table_id = 10;
let table_id = Some(10);
let table_name = "test_metric";
assert!(build_create_table_request("", "", table_id, table_name, &[]).is_err());
assert!(build_create_expr_from_insertion("", "", table_id, table_name, &[]).is_err());
let insert_batches = insert_batches(mock_insert_batches()).unwrap();
let mock_batch_bytes = mock_insert_batches();
let insert_batches = insert_batches(&mock_batch_bytes).unwrap();
let req =
build_create_table_request("", "", table_id, table_name, &insert_batches).unwrap();
assert_eq!(table_id, req.id);
assert_eq!(table_name, req.table_name);
assert!(req.desc.is_none());
assert_eq!(vec![0], req.primary_key_indices);
let create_expr =
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batches)
.unwrap();
assert_eq!(table_id, create_expr.table_id);
assert_eq!(table_name, create_expr.table_name);
assert_eq!(Some("Created on insertion".to_string()), create_expr.desc);
assert_eq!(
vec![create_expr.column_defs[0].name.clone()],
create_expr.primary_keys
);
let column_defs = create_expr.column_defs;
assert_eq!(column_defs[3].name, create_expr.time_index);
assert_eq!(4, column_defs.len());
let schema = req.schema;
assert_eq!(Some(3), schema.timestamp_index());
assert_eq!(4, schema.num_columns());
assert_eq!(
ConcreteDataType::string_datatype(),
schema.column_schema_by_name("host").unwrap().data_type
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
column_defs
.iter()
.find(|c| c.name == "host")
.unwrap()
.datatype
)
.unwrap()
)
);
assert_eq!(
ConcreteDataType::float64_datatype(),
schema.column_schema_by_name("cpu").unwrap().data_type
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
column_defs
.iter()
.find(|c| c.name == "cpu")
.unwrap()
.datatype
)
.unwrap()
)
);
assert_eq!(
ConcreteDataType::float64_datatype(),
schema.column_schema_by_name("memory").unwrap().data_type
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
column_defs
.iter()
.find(|c| c.name == "memory")
.unwrap()
.datatype
)
.unwrap()
)
);
assert_eq!(
ConcreteDataType::timestamp_millis_datatype(),
schema.column_schema_by_name("ts").unwrap().data_type
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
column_defs
.iter()
.find(|c| c.name == "ts")
.unwrap()
.datatype
)
.unwrap()
)
);
}
@@ -440,22 +502,32 @@ mod tests {
assert!(find_new_columns(&schema, &[]).unwrap().is_none());
let insert_batches = insert_batches(mock_insert_batches()).unwrap();
let new_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap();
let mock_insert_bytes = mock_insert_batches();
let insert_batches = insert_batches(&mock_insert_bytes).unwrap();
let add_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap();
assert_eq!(2, new_columns.len());
let host_column = &new_columns[0];
assert_eq!(2, add_columns.add_columns.len());
let host_column = &add_columns.add_columns[0];
assert!(host_column.is_key);
assert_eq!(
ConcreteDataType::string_datatype(),
host_column.column_schema.data_type
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(host_column.column_def.as_ref().unwrap().datatype)
.unwrap()
)
);
let memory_column = &new_columns[1];
let memory_column = &add_columns.add_columns[1];
assert!(!memory_column.is_key);
assert_eq!(
ConcreteDataType::float64_datatype(),
memory_column.column_schema.data_type
)
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(memory_column.column_def.as_ref().unwrap().datatype)
.unwrap()
)
);
}
#[test]
@@ -465,7 +537,7 @@ mod tests {
let values = insert_expr::Values {
values: mock_insert_batches(),
};
let insert_batches = insert_batches(values.values).unwrap();
let insert_batches = insert_batches(&values.values).unwrap();
let insert_req =
insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap();

View File

@@ -1,6 +1,6 @@
pub mod error;
mod insert;
pub use insert::{
build_alter_table_request, build_create_table_request, find_new_columns, insert_batches,
build_alter_table_request, build_create_expr_from_insertion, find_new_columns, insert_batches,
insertion_expr_to_request,
};

View File

@@ -82,4 +82,8 @@ impl Datanode {
self.services.start(&self.opts).await?;
Ok(())
}
pub fn get_instance(&self) -> InstanceRef {
self.instance.clone()
}
}

View File

@@ -284,16 +284,15 @@ pub enum Error {
#[snafu(display("Insert batch is empty"))]
EmptyInsertBatch,
#[snafu(display("Failed to build frontend instance, source: {}", source))]
BuildFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},
#[snafu(display(
"Table id provider not found, cannot execute SQL directly on datanode in distributed mode"
))]
TableIdProviderNotFound { backtrace: Backtrace },
#[snafu(display("Failed to start frontend instance, source: {}", source))]
StartFrontend {
#[snafu(display("Failed to bump table id, source: {}", source))]
BumpTableId {
#[snafu(backtrace)]
source: frontend::error::Error,
source: table::error::Error,
},
}
@@ -363,9 +362,8 @@ impl ErrorExt for Error {
Error::MetaClientInit { source, .. } => source.status_code(),
Error::InsertData { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
Error::BuildFrontend { source, .. } | Error::StartFrontend { source, .. } => {
source.status_code()
}
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
Error::BumpTableId { source, .. } => source.status_code(),
}
}

View File

@@ -13,6 +13,7 @@ use object_store::{services::fs::Builder, util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use snafu::prelude::*;
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table::table::TableIdProviderRef;
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
@@ -35,6 +36,7 @@ pub struct Instance {
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) physical_planner: PhysicalPlanner,
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
#[allow(unused)]
pub(crate) meta_client: Option<Arc<MetaClient>>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
@@ -66,7 +68,7 @@ impl Instance {
));
// create remote catalog manager
let (catalog_manager, factory) = match opts.mode {
let (catalog_manager, factory, table_id_provider) = match opts.mode {
Mode::Standalone => {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
@@ -74,7 +76,11 @@ impl Instance {
.context(CatalogSnafu)?,
);
let factory = QueryEngineFactory::new(catalog.clone());
(catalog as CatalogManagerRef, factory)
(
catalog.clone() as CatalogManagerRef,
factory,
Some(catalog as TableIdProviderRef),
)
}
Mode::Distributed => {
@@ -86,7 +92,7 @@ impl Instance {
}),
));
let factory = QueryEngineFactory::new(catalog.clone());
(catalog as CatalogManagerRef, factory)
(catalog as CatalogManagerRef, factory, None)
}
};
@@ -110,6 +116,7 @@ impl Instance {
script_executor,
meta_client,
heartbeat_task,
table_id_provider,
})
}

View File

@@ -1,9 +1,7 @@
use std::ops::Deref;
use api::v1::codec::RegionNumber;
use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use api::v1::{
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
CreateDatabaseExpr, ObjectExpr, ObjectResult, SelectExpr,
admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, CreateDatabaseExpr,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -11,12 +9,11 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_insert::insertion_expr_to_request;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::{AddColumnRequest, CreateDatabaseRequest};
use table::requests::CreateDatabaseRequest;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu,
@@ -24,78 +21,10 @@ use crate::error::{
UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::SqlRequest;
impl Instance {
async fn add_new_columns_to_table(
&self,
table_name: &str,
add_columns: Vec<AddColumnRequest>,
) -> Result<()> {
let column_names = add_columns
.iter()
.map(|req| req.column_schema.name.clone())
.collect::<Vec<_>>();
let alter_request = common_insert::build_alter_table_request(table_name, add_columns);
debug!(
"Adding new columns: {:?} to table: {}",
column_names, table_name
);
let _result = self
.sql_handler()
.execute(SqlRequest::Alter(alter_request))
.await?;
info!(
"Added new columns: {:?} to table: {}",
column_names, table_name
);
Ok(())
}
async fn create_table_by_insert_batches(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
// Create table automatically, build schema from data.
let table_id = self
.catalog_manager
.next_table_id()
.await
.context(CatalogSnafu)?;
let create_table_request = common_insert::build_create_table_request(
catalog_name,
schema_name,
table_id,
table_name,
insert_batches,
)
.context(InsertDataSnafu)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_table_request,
);
let _result = self
.sql_handler()
.execute(SqlRequest::CreateTable(create_table_request))
.await?;
info!("Success to create table: {} automatically", table_name);
Ok(())
}
pub async fn execute_grpc_insert(
&self,
catalog_name: &str,
@@ -113,34 +42,14 @@ impl Instance {
.context(SchemaNotFoundSnafu { name: schema_name })?;
let insert_batches =
common_insert::insert_batches(values.values).context(InsertDataSnafu)?;
common_insert::insert_batches(&values.values).context(InsertDataSnafu)?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);
let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? {
let schema = table.schema();
if let Some(add_columns) = common_insert::find_new_columns(&schema, &insert_batches)
.context(InsertDataSnafu)?
{
self.add_new_columns_to_table(table_name, add_columns)
.await?;
}
table
} else {
self.create_table_by_insert_batches(
catalog_name,
schema_name,
table_name,
&insert_batches,
)
.await?;
schema_provider
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?
};
let table = schema_provider
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;
let insert = insertion_expr_to_request(
catalog_name,
@@ -253,16 +162,8 @@ impl GrpcQueryHandler for Instance {
reason: "missing `expr` in `InsertExpr`",
})?;
// TODO(fys): _region_id is for later use.
let _region_id: Option<RegionNumber> = insert_expr
.options
.get("region_id")
.map(|id| {
id.deref()
.try_into()
.context(servers::error::DecodeRegionNumberSnafu)
})
.transpose()?;
// TODO(fys): _region_number is for later use.
let _region_number: u32 = insert_expr.region_number;
match expr {
insert_expr::Expr::Values(values) => {

View File

@@ -11,7 +11,8 @@ use sql::statements::statement::Statement;
use table::requests::CreateDatabaseRequest;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu,
BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ExecuteSqlSnafu, ParseSqlSnafu, Result,
SchemaNotFoundSnafu, TableIdProviderNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metric;
@@ -67,10 +68,12 @@ impl Instance {
Statement::CreateTable(c) => {
let table_id = self
.catalog_manager
.table_id_provider
.as_ref()
.context(TableIdProviderNotFoundSnafu)?
.next_table_id()
.await
.context(CatalogSnafu)?;
.context(BumpTableIdSnafu)?;
let _engine_name = c.engine.clone();
// TODO(hl): Select table engine by engine_name

View File

@@ -1,11 +1,15 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use catalog::remote::MetaKvBackend;
use common_catalog::consts::MIN_USER_TABLE_ID;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_srv::mocks::MockInfo;
use query::QueryEngineFactory;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
use table::table::{TableIdProvider, TableIdProviderRef};
use table_engine::config::EngineConfig as TableEngineConfig;
use crate::datanode::DatanodeOptions;
@@ -53,6 +57,8 @@ impl Instance {
"127.0.0.1:3302".to_string(),
meta_client.as_ref().unwrap().clone(),
));
let table_id_provider = Some(catalog_manager.clone() as TableIdProviderRef);
Ok(Self {
query_engine,
sql_handler,
@@ -61,6 +67,7 @@ impl Instance {
script_executor,
meta_client,
heartbeat_task,
table_id_provider,
})
}
@@ -105,12 +112,32 @@ impl Instance {
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
meta_client: Some(meta_client),
heartbeat_task: Some(heartbeat_task),
})
}
}
struct LocalTableIdProvider {
inner: Arc<AtomicU32>,
}
impl Default for LocalTableIdProvider {
fn default() -> Self {
Self {
inner: Arc::new(AtomicU32::new(MIN_USER_TABLE_ID)),
}
}
}
#[async_trait::async_trait]
impl TableIdProvider for LocalTableIdProvider {
async fn next_table_id(&self) -> table::Result<TableId> {
Ok(self.inner.fetch_add(1, Ordering::Relaxed))
}
}
async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient {
let MockInfo {
server_addr,

View File

@@ -1,28 +1,21 @@
pub mod grpc;
use std::default::Default;
use std::net::SocketAddr;
use std::sync::Arc;
use common_error::prelude::BoxedError;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::instance::Instance as FrontendInstanceImpl;
use servers::grpc::GrpcServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
use crate::datanode::DatanodeOptions;
use crate::error::{
BuildFrontendSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu,
};
use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu};
use crate::instance::InstanceRef;
pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
frontend: Option<Frontend<FrontendInstanceImpl>>,
}
impl Services {
@@ -35,54 +28,19 @@ impl Services {
.context(RuntimeResourceSnafu)?,
);
let frontend = match opts.mode {
Mode::Standalone => Some(Self::build_frontend(opts).await?),
Mode::Distributed => {
info!("Starting datanode in distributed mode, only gRPC server will be started.");
None
}
};
Ok(Self {
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
frontend,
grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime),
})
}
/// Build frontend instance in standalone mode
async fn build_frontend(opts: &DatanodeOptions) -> Result<Frontend<FrontendInstanceImpl>> {
let grpc_server_addr = &opts.rpc_addr;
info!(
"Build frontend with datanode gRPC addr: {}",
grpc_server_addr
);
let options = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: grpc_server_addr.clone(),
..Default::default()
};
let frontend_instance = FrontendInstanceImpl::try_new(&options)
.await
.context(BuildFrontendSnafu)?;
Ok(Frontend::new(options, frontend_instance))
}
pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
try_join!(self.grpc_server.start(grpc_addr), async {
if let Some(ref mut frontend_instance) = self.frontend {
info!("Starting frontend instance");
frontend_instance
.start()
.await
.map_err(BoxedError::new)
.context(servers::error::StartFrontendSnafu)?;
}
Ok(())
})
.context(StartServerSnafu)?;
self.grpc_server
.start(grpc_addr)
.await
.context(StartServerSnafu)?;
Ok(())
}
}

View File

@@ -1,4 +1,3 @@
mod ddl;
pub(crate) mod handler;
pub(crate) mod plan;
pub mod select;

View File

@@ -1,24 +1,70 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::AdminResultBuilder;
use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{ErrorExt, StatusCode};
use common_query::Output;
use common_telemetry::{error, info};
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use snafu::prelude::*;
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{self, CatalogSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result};
use crate::error::{
self, BumpTableIdSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result,
};
use crate::instance::Instance;
use crate::server::grpc::handler::AdminResultBuilder;
use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
let request = self.create_expr_to_request(expr).await;
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id
);
table_id
} else {
match self.table_id_provider.as_ref() {
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Internal as u32)
.err_msg("Table id provider absent in standalone mode".to_string())
.build();
}
Some(table_id_provider) => {
match table_id_provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
{
Ok(table_id) => {
info!(
"Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}",
&expr.catalog_name, &expr.schema_name, expr.table_name, table_id
);
table_id
}
Err(e) => {
error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name);
return AdminResultBuilder::default()
.status_code(e.status_code() as u32)
.err_msg(e.to_string())
.build();
}
}
}
}
};
let request = create_expr_to_request(table_id, expr).await;
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::CreateTable(request)))
.await;
@@ -37,7 +83,7 @@ impl Instance {
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match self.alter_expr_to_request(expr).transpose() {
let request = match alter_expr_to_request(expr).transpose() {
Some(req) => req,
None => {
return AdminResultBuilder::default()
@@ -62,77 +108,76 @@ impl Instance {
.build(),
}
}
}
async fn create_expr_to_request(&self, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(error::KeyColumnNotFoundSnafu { name: key })
})
.collect::<Result<Vec<usize>>>()?;
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let table_id = self
.catalog_manager()
.next_table_id()
.await
.context(CatalogSnafu)?;
let region_id = expr
.table_options
.get(&"region_id".to_string())
.unwrap()
.parse::<u32>()
.unwrap();
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: vec![region_id],
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
async fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(error::KeyColumnNotFoundSnafu { name: key })
})
}
.collect::<Result<Vec<usize>>>()?;
fn alter_expr_to_request(&self, expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumn(add_column)) => {
let column_def = add_column.column_def.context(MissingFieldSnafu {
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_ids = if expr.region_ids.is_empty() {
vec![0]
} else {
expr.region_ids
};
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: region_ids,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
})
}
fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let mut add_column_requests = vec![];
for add_column_expr in add_columns.add_columns {
let column_def = add_column_expr.column_def.context(MissingFieldSnafu {
field: "column_def",
})?;
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: create_column_schema(&column_def)?,
// FIXME(dennis): supports adding key column
is_key: false,
}],
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
let schema = create_column_schema(&column_def)?;
add_column_requests.push(AddColumnRequest {
column_schema: schema,
is_key: add_column_expr.is_key,
})
}
None => Ok(None),
let alter_kind = AlterKind::AddColumns {
columns: add_column_requests,
};
let request = AlterTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
}
None => Ok(None),
}
}
@@ -191,8 +236,7 @@ fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_catalog::consts::MIN_USER_TABLE_ID;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
@@ -207,7 +251,7 @@ mod tests {
instance.start().await.unwrap();
let expr = testing_create_expr();
let request = instance.create_expr_to_request(expr).await.unwrap();
let request = create_expr_to_request(1024, expr).await.unwrap();
assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
@@ -219,7 +263,7 @@ mod tests {
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = instance.create_expr_to_request(expr).await;
let result = create_expr_to_request(1025, expr).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
@@ -312,9 +356,6 @@ mod tests {
default_constraint: None,
},
];
let table_options = [("region_id".to_string(), "0".to_string())]
.into_iter()
.collect::<HashMap<_, _>>();
CreateExpr {
catalog_name: None,
schema_name: None,
@@ -324,7 +365,9 @@ mod tests {
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::{build_err_result, ObjectResultBuilder};
use api::v1::{codec::SelectResult, column::SemanticType, column::Values, Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::BitVec;
@@ -12,7 +13,6 @@ use datatypes::schema::SchemaRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, ConversionSnafu, Result};
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
pub async fn to_object_result(output: Result<Output>) -> ObjectResult {
let result = match output {

View File

@@ -5,6 +5,7 @@ use catalog::{RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_telemetry::tracing::info;
use common_telemetry::tracing::log::error;
use datatypes::schema::SchemaBuilder;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::TableConstraint;
@@ -16,9 +17,9 @@ use table::metadata::TableId;
use table::requests::*;
use crate::error::{
self, ConstraintNotSupportedSnafu, CreateSchemaSnafu, CreateTableSnafu,
InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu,
Result,
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateSchemaSnafu,
CreateTableSnafu, InsertSystemCatalogSnafu, InvalidPrimaryKeySnafu, KeyColumnNotFoundSnafu,
RegisterSchemaSnafu, Result, SchemaNotFoundSnafu,
};
use crate::sql::SqlHandler;
@@ -40,10 +41,36 @@ impl SqlHandler {
pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result<Output> {
let ctx = EngineContext {};
// first check if catalog and schema exist
let catalog = self
.catalog_manager
.catalog(&req.catalog_name)
.context(CatalogSnafu)?
.with_context(|| {
error!(
"Failed to create table {}.{}.{}, catalog not found",
&req.catalog_name, &req.schema_name, &req.table_name
);
CatalogNotFoundSnafu {
name: &req.catalog_name,
}
})?;
catalog
.schema(&req.schema_name)
.context(CatalogSnafu)?
.with_context(|| {
error!(
"Failed to create table {}.{}.{}, schema not found",
&req.catalog_name, &req.schema_name, &req.table_name
);
SchemaNotFoundSnafu {
name: &req.schema_name,
}
})?;
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table_id = req.id;
let table = self
.table_engine
.create_table(&ctx, req)

View File

@@ -4,30 +4,37 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::ColumnDataType;
use api::v1::{
admin_result, alter_expr::Kind, codec::InsertBatch, column, column::SemanticType, insert_expr,
AddColumn, AlterExpr, Column, ColumnDef, CreateExpr, InsertExpr, MutateResult,
};
use api::v1::{AddColumns, ColumnDataType};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_runtime::Builder as RuntimeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::frontend::Mode::Standalone;
use frontend::grpc::GrpcOptions;
use servers::grpc::GrpcServer;
use servers::server::Server;
use crate::instance::Instance;
use crate::tests::test_util::{self, TestGuard};
async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<GrpcServer>) {
async fn setup_grpc_server(
name: &str,
port: usize,
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let addr = format!("127.0.0.1:{}", port);
opts.rpc_addr = addr.clone();
let datanode_grpc_addr = format!("127.0.0.1:{}", port);
opts.rpc_addr = datanode_grpc_addr.clone();
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let addr_cloned = addr.clone();
let datanode_grpc_addr = datanode_grpc_addr.clone();
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
@@ -36,30 +43,65 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
.unwrap(),
);
let grpc_server = Arc::new(GrpcServer::new(instance.clone(), instance, runtime));
let grpc_server_clone = grpc_server.clone();
let fe_grpc_addr = format!("127.0.0.1:{}", port + 1);
let fe_opts = FrontendOptions {
mode: Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
runtime_size: 8,
}),
..Default::default()
};
let datanode_grpc_server = Arc::new(GrpcServer::new(
instance.clone(),
instance.clone(),
runtime.clone(),
));
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
.await
.unwrap();
fe_instance.set_catalog_manager(instance.catalog_manager.clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),
fe_instance_ref,
runtime,
));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();
tokio::spawn(async move {
let addr = addr_cloned.parse::<SocketAddr>().unwrap();
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
});
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
let dn_grpc_server_clone = datanode_grpc_server.clone();
tokio::spawn(async move {
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
dn_grpc_server_clone.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
(addr, guard, grpc_server)
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_auto_create_table() {
let (addr, _guard, grpc_server) = setup_grpc_server("auto_create_table", 3991).await;
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("auto_create_table", 3991).await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
grpc_server.shutdown().await.unwrap();
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
}
fn expect_data() -> (Column, Column, Column, Column) {
@@ -120,7 +162,8 @@ fn expect_data() -> (Column, Column, Column, Column) {
#[ignore]
async fn test_insert_and_select() {
common_telemetry::init_default_ut_logging();
let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await;
let (addr, _guard, fe_grpc_server, dn_grpc_server) =
setup_grpc_server("insert_and_select", 3990).await;
let grpc_client = Client::with_urls(vec![addr]);
@@ -145,8 +188,11 @@ async fn test_insert_and_select() {
is_nullable: true,
default_constraint: None,
};
let kind = Kind::AddColumn(AddColumn {
column_def: Some(add_column),
let kind = Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(add_column),
is_key: false,
}],
});
let expr = AlterExpr {
table_name: "test_table".to_string(),
@@ -160,7 +206,8 @@ async fn test_insert_and_select() {
// insert
insert_and_assert(&db).await;
grpc_server.shutdown().await.unwrap();
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
}
async fn insert_and_assert(db: &Database) {
@@ -178,12 +225,14 @@ async fn insert_and_assert(db: &Database) {
}
.into()];
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
options: HashMap::default(),
region_number: 0,
};
let result = db.insert(expr).await;
assert!(result.is_ok());
result.unwrap();
// select
let result = db
@@ -249,6 +298,8 @@ fn testing_create_expr() -> CreateExpr {
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: HashMap::from([("region_id".to_string(), "0".to_string())]),
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::frontend::Mode;
use snafu::ResultExt;
use table::engine::EngineContext;
use table::engine::TableEngineRef;
@@ -32,6 +33,7 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
mode: Mode::Standalone,
..Default::default()
};
(

View File

@@ -18,12 +18,14 @@ common-catalog = { path = "../common/catalog" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
common-insert = { path = "../common/insert" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"
itertools = "0.10"
meta-client = { path = "../meta-client" }
moka = { version = "0.9", features = ["future"] }

View File

@@ -7,7 +7,8 @@ use catalog::error::{
};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
};
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
use futures::StreamExt;
@@ -41,6 +42,45 @@ impl FrontendCatalogManager {
}
}
// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting
// as soon as it's stable: https://github.com/rust-lang/rust/issues/65991
#[async_trait::async_trait]
impl CatalogManager for FrontendCatalogManager {
async fn start(&self) -> catalog::error::Result<()> {
Ok(())
}
async fn register_table(
&self,
_request: RegisterTableRequest,
) -> catalog::error::Result<usize> {
unimplemented!()
}
async fn register_schema(
&self,
_request: RegisterSchemaRequest,
) -> catalog::error::Result<usize> {
unimplemented!()
}
async fn register_system_table(
&self,
_request: RegisterSystemTableRequest,
) -> catalog::error::Result<()> {
unimplemented!()
}
fn table(
&self,
_catalog: &str,
_schema: &str,
_table_name: &str,
) -> catalog::error::Result<Option<TableRef>> {
unimplemented!()
}
}
impl CatalogList for FrontendCatalogManager {
fn as_any(&self) -> &dyn Any {
self

View File

@@ -121,7 +121,7 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Invaild InsertRequest, reason: {}", reason))]
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest {
reason: String,
backtrace: Backtrace,
@@ -194,6 +194,66 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to bump table id when creating table, source: {}", source))]
BumpTableId {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to create table, source: {}", source))]
CreateTable {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to alter table, source: {}", source))]
AlterTable {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to insert values to table, source: {}", source))]
Insert {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to select from table, source: {}", source))]
Select {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to create table on insertion, source: {}", source))]
CreateTableOnInsertion {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to alter table on insertion, source: {}", source))]
AlterTableOnInsertion {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to build CreateExpr on insertion: {}", source))]
BuildCreateExprOnInsertion {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
#[snafu(display("Failed to find new columns on insertion: {}", source))]
FindNewColumnsOnInsertion {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
#[snafu(display("Failed to deserialize insert batching: {}", source))]
DeserializeInsertBatch {
#[snafu(backtrace)]
source: common_insert::error::Error,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
CatalogNotFound {
catalog_name: String,
@@ -266,13 +326,23 @@ impl ErrorExt for Error {
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::JoinTask { .. }
| Error::SchemaNotFound { .. }
| Error::CatalogNotFound { .. } => StatusCode::Unexpected,
Error::JoinTask { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::ParseCatalogEntry { source, .. } => source.status_code(),
Error::RequestMeta { source } => source.status_code(),
Error::BumpTableId { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
Error::CreateTable { source, .. } => source.status_code(),
Error::AlterTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(),
Error::CreateTableOnInsertion { source, .. } => source.status_code(),
Error::AlterTableOnInsertion { source, .. } => source.status_code(),
Error::Select { source, .. } => source.status_code(),
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
}
}

View File

@@ -7,18 +7,24 @@ use std::sync::Arc;
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::result::ObjectResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::object_expr::Expr;
use api::v1::{
insert_expr, AdminExpr, AdminResult, AlterExpr, ColumnDataType, ColumnDef as GrpcColumnDef,
CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, ColumnDataType,
ColumnDef as GrpcColumnDef, CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr,
ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogList, CatalogProviderRef, SchemaProviderRef};
use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef};
use client::admin::{admin_result_to_output, Admin};
use client::{Client, Database, Select};
use common_error::prelude::BoxedError;
use common_error::prelude::{BoxedError, StatusCode};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::schema::ColumnSchema;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
@@ -34,10 +40,16 @@ use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
use table::table::TableIdProviderRef;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::error::{
self, AlterTableOnInsertionSnafu, AlterTableSnafu, BuildCreateExprOnInsertionSnafu,
BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ConvertColumnDefaultConstraintSnafu,
CreateTableOnInsertionSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
};
use crate::frontend::{FrontendOptions, Mode};
use crate::sql::insert_to_request;
use crate::table::route::TableRoutes;
@@ -65,7 +77,10 @@ pub struct Instance {
// But in distribute mode, frontend should fetch datanodes' addresses from metasrv.
client: Client,
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<FrontendCatalogManager>,
catalog_manager: Option<CatalogManagerRef>,
/// Table id provider, in standalone mode is left to None, but in distributed mode,
/// table id should be generated by metasrv.
table_id_provider: Option<TableIdProviderRef>,
// TODO(fys): it should be a trait that corresponds to two implementations:
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
@@ -77,6 +92,7 @@ impl Default for Instance {
Self {
client: Client::default(),
catalog_manager: None,
table_id_provider: None,
mode: Mode::Standalone,
}
}
@@ -116,7 +132,7 @@ impl Instance {
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager =
FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients);
Some(catalog_manager)
Some(Arc::new(catalog_manager))
} else {
None
};
@@ -133,6 +149,276 @@ impl Instance {
Admin::new("greptime", self.client.clone())
}
pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) {
self.catalog_manager = Some(catalog_manager);
}
pub async fn handle_select(&self, expr: Select) -> Result<Output> {
self.database()
.select(expr)
.await
.and_then(Output::try_from)
.context(SelectSnafu)
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
async fn create_to_expr(&self, create: CreateTable) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let table_id = match &self.table_id_provider {
Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?),
None => None,
};
// FIXME(hl): Region id should be generated from metasrv
let region_ids = vec![0];
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
desc: None,
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine)]),
table_id,
region_ids,
};
Ok(expr)
}
/// Handle create expr.
pub async fn handle_create_table(&self, expr: CreateExpr) -> Result<Output> {
let result = self.admin().create(expr.clone()).await;
if let Err(e) = &result {
error!(e; "Failed to create table by expr: {:?}", expr);
}
result
.and_then(admin_result_to_output)
.context(CreateTableSnafu)
}
/// Handle create database expr.
pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<Output> {
self.admin()
.create_database(expr)
.await
.and_then(admin_result_to_output)
.context(CreateTableSnafu)
}
/// Handle alter expr
pub async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
self.admin()
.alter(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableSnafu)
}
/// Handle batch inserts
pub async fn handle_inserts(&self, insert_expr: &[InsertExpr]) -> Result<Output> {
let mut success = 0;
for expr in insert_expr {
match self.handle_insert(expr).await? {
Output::AffectedRows(rows) => success += rows,
_ => unreachable!("Insert should not yield output other than AffectedRows"),
}
}
Ok(Output::AffectedRows(success))
}
/// Handle insert. for 'values' insertion, create/alter the destination table on demand.
pub async fn handle_insert(&self, insert_expr: &InsertExpr) -> Result<Output> {
let table_name = &insert_expr.table_name;
let catalog_name = "greptime";
let schema_name = "public";
if let Some(expr) = &insert_expr.expr {
match expr {
api::v1::insert_expr::Expr::Values(values) => {
// TODO(hl): gRPC should also support partitioning.
let region_number = 0;
self.handle_insert_values(
catalog_name,
schema_name,
table_name,
region_number,
values,
)
.await
}
api::v1::insert_expr::Expr::Sql(_) => {
// Frontend does not comprehend insert request that is raw SQL string
self.database()
.insert(insert_expr.clone())
.await
.and_then(Output::try_from)
.context(InsertSnafu)
}
}
} else {
// expr is empty
Ok(Output::AffectedRows(0))
}
}
/// Handle insert requests in frontend
/// If insert is SQL string flavor, just forward to datanode
/// If insert is parsed InsertExpr, frontend should comprehend the schema and create/alter table on demand.
pub async fn handle_insert_values(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_number: u32,
values: &insert_expr::Values,
) -> Result<Output> {
let insert_batches =
common_insert::insert_batches(&values.values).context(DeserializeInsertBatchSnafu)?;
self.create_or_alter_table_on_demand(
catalog_name,
schema_name,
table_name,
&insert_batches,
)
.await?;
self.database()
.insert(InsertExpr {
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_number,
options: Default::default(),
expr: Some(insert_expr::Expr::Values(values.clone())),
})
.await
.and_then(Output::try_from)
.context(InsertSnafu)
}
// check if table already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_table_on_demand(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
match self
.catalog_manager
.as_ref()
.expect("catalog manager cannot be None")
.catalog(catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { catalog_name })?
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: schema_name,
})?
.table(table_name)
.context(CatalogSnafu)?
{
None => {
info!(
"Table {}.{}.{} does not exist, try create table",
catalog_name, schema_name, table_name,
);
self.create_table_by_insert_batches(
catalog_name,
schema_name,
table_name,
insert_batches,
)
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
);
}
Some(table) => {
let schema = table.schema();
if let Some(add_columns) = common_insert::find_new_columns(&schema, insert_batches)
.context(FindNewColumnsOnInsertionSnafu)?
{
info!(
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
self.add_new_columns_to_table(table_name, add_columns)
.await?;
info!(
"Successfully altered table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
);
}
}
};
Ok(())
}
/// Infer create table expr from inserting data
async fn create_table_by_insert_batches(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<Output> {
// Create table automatically, build schema from data.
let table_id = match &self.table_id_provider {
Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?),
None => None,
};
let create_expr = common_insert::build_create_expr_from_insertion(
catalog_name,
schema_name,
table_id,
table_name,
insert_batches,
)
.context(BuildCreateExprOnInsertionSnafu)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_expr,
);
self.admin()
.create(create_expr)
.await
.and_then(admin_result_to_output)
.context(CreateTableOnInsertionSnafu)
}
async fn add_new_columns_to_table(
&self,
table_name: &str,
add_columns: AddColumns,
) -> Result<Output> {
debug!(
"Adding new columns: {:?} to table: {}",
add_columns, table_name
);
let expr = AlterExpr {
table_name: table_name.to_string(),
schema_name: None,
catalog_name: None,
kind: Some(Kind::AddColumns(add_columns)),
};
self.admin()
.alter(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableOnInsertionSnafu)
}
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
self.catalog_manager
.as_ref()
@@ -159,6 +445,11 @@ impl Instance {
let insert_request = insert_to_request(&schema_provider, *insert)?;
let batch = crate::table::insert::insert_request_to_insert_batch(&insert_request)?;
self.create_or_alter_table_on_demand(&catalog, &schema, &table, &[batch])
.await?;
let table = schema_provider
.table(&table)
.context(error::CatalogSnafu)?
@@ -181,10 +472,11 @@ impl FrontendInstance for Instance {
#[cfg(test)]
impl Instance {
pub fn with_client(client: Client) -> Self {
pub fn with_client_and_catalog_manager(client: Client, catalog: CatalogManagerRef) -> Self {
Self {
client,
catalog_manager: None,
catalog_manager: Some(catalog),
table_id_provider: None,
mode: Mode::Standalone,
}
}
@@ -208,65 +500,58 @@ impl SqlQueryHandler for Instance {
match stmt {
Statement::Query(_) => self
.database()
.select(Select::Sql(query.to_string()))
.handle_select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::Insert(insert) => {
match self.mode {
Mode::Standalone => {
// TODO(dennis): respect schema_name when inserting data
let (_catalog_name, _schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "Failed to get table name",
})?;
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
let (_, schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "Failed to get table name",
})?;
let expr = InsertExpr {
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
options: HashMap::default(),
};
self.database()
.insert(expr)
.await
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Mode::Distributed => {
let affected = self
.sql_dist_insert(insert)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
Ok(Output::AffectedRows(affected))
}
let expr = InsertExpr {
schema_name,
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
region_number: 0,
options: HashMap::default(),
};
self.handle_insert(&expr)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
}
Mode::Distributed => {
let affected = self
.sql_dist_insert(insert)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "execute insert failed",
})?;
Ok(Output::AffectedRows(affected))
}
},
Statement::CreateTable(create) => {
let expr = create_to_expr(create)
let expr = self
.create_to_expr(create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.admin()
.create(expr)
self.handle_create_table(expr)
.await
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self
.database()
.select(Select::Sql(query.to_string()))
.handle_select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into())
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
@@ -274,28 +559,26 @@ impl SqlQueryHandler for Instance {
let expr = CreateDatabaseExpr {
database_name: c.name.to_string(),
};
self.admin()
.create_database(expr)
self.handle_create_database(expr)
.await
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Statement::Alter(alter_stmt) => self
.admin()
.alter(
.handle_alter(
AlterExpr::try_from(alter_stmt)
.map_err(BoxedError::new)
.context(server_error::ExecuteAlterSnafu { query })?,
)
.await
.and_then(admin_result_to_output)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail()
}
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> {
@@ -313,29 +596,6 @@ impl SqlQueryHandler for Instance {
}
}
fn create_to_expr(create: CreateTable) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
column_defs: columns_to_expr(&create.columns, &time_index)?,
time_index,
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([
("engine".to_string(), create.engine),
("region_id".to_string(), "0".to_string()),
]),
..Default::default()
};
Ok(expr)
}
fn find_primary_keys(constraints: &[TableConstraint]) -> Result<Vec<String>> {
let primary_keys = constraints
.iter()
@@ -421,13 +681,44 @@ fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result<Vec<Gr
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
self.database()
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
if let Some(expr) = &query.expr {
match expr {
Expr::Insert(insert) => {
let result = self.handle_insert(insert).await;
result
.map(|o| match o {
Output::AffectedRows(rows) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0u32)
.build(),
_ => {
unreachable!()
}
})
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
_ => self
.database()
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
}),
}
} else {
// why?
self.database()
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
}
}
@@ -617,9 +908,11 @@ mod tests {
}
.into()];
let insert_expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
options: HashMap::default(),
region_number: 0,
};
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
@@ -708,16 +1001,18 @@ mod tests {
default_constraint: None,
},
];
let mut table_options = HashMap::with_capacity(1);
table_options.insert("region_id".to_string(), "0".to_string());
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "demo".to_string(),
desc: None,
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options,
..Default::default()
table_options: Default::default(),
table_id: None,
region_ids: vec![0],
}
}
}

View File

@@ -17,8 +17,7 @@ impl InfluxdbLineProtocolHandler for Instance {
match self.mode {
Mode::Standalone => {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.database()
.batch_insert(exprs)
self.handle_inserts(&exprs)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
@@ -45,7 +44,14 @@ impl Instance {
for insert in inserts {
let self_clone = self.clone();
let insert_batch = crate::table::insert::insert_request_to_insert_batch(&insert)?;
self.create_or_alter_table_on_demand(
&insert.catalog_name,
&insert.schema_name,
&insert.table_name,
&[insert_batch],
)
.await?;
// TODO(fys): need a separate runtime here
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(&insert.catalog_name)?;

View File

@@ -1,12 +1,11 @@
use async_trait::async_trait;
use client::ObjectResult;
use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::error::Result;
use crate::frontend::Mode;
use crate::instance::Instance;
@@ -41,27 +40,7 @@ impl OpentsdbProtocolHandler for Instance {
impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let expr = data_point.as_grpc_insert();
let result = self.database().insert(expr.clone()).await;
let object_result = match result {
Ok(result) => result,
Err(_) => {
return Err(result.context(error::RequestDatanodeSnafu).unwrap_err());
}
};
match object_result {
ObjectResult::Mutate(mutate) => {
if mutate.success != 1 || mutate.failure != 0 {
return error::ExecOpentsdbPutSnafu {
reason: format!("illegal result: {:?}", mutate),
}
.fail();
}
}
ObjectResult::Select(_) => unreachable!(),
}
self.handle_insert(&expr).await?;
Ok(())
}
}

View File

@@ -94,11 +94,15 @@ impl PrometheusProtocolHandler for Instance {
match self.mode {
Mode::Standalone => {
let exprs = prometheus::write_request_to_insert_exprs(request)?;
self.database()
.batch_insert(exprs)
let futures = exprs
.iter()
.map(|e| self.handle_insert(e))
.collect::<Vec<_>>();
let res = futures_util::future::join_all(futures)
.await
.map_err(BoxedError::new)
.into_iter()
.collect::<Result<Vec<_>, crate::error::Error>>();
res.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "failed to write prometheus remote request",
})?;
@@ -167,6 +171,7 @@ mod tests {
#[tokio::test]
async fn test_prometheus_remote_write_and_read() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_frontend_instance().await;
let write_request = WriteRequest {
@@ -174,7 +179,7 @@ mod tests {
..Default::default()
};
assert!(instance.write(write_request).await.is_ok());
instance.write(write_request).await.unwrap();
let read_request = ReadRequest {
queries: vec![

View File

@@ -145,9 +145,8 @@ async fn start_server(
server_and_addr: Option<(Box<dyn Server>, SocketAddr)>,
) -> servers::error::Result<Option<SocketAddr>> {
if let Some((server, addr)) = server_and_addr {
let res = server.start(addr).await.map(Some)?;
info!("Starting server at {}", addr);
Ok(res)
server.start(addr).await.map(Some)
} else {
Ok(None)
}

View File

@@ -1,4 +1,3 @@
mod insert;
pub(crate) mod route;
use std::any::Any;
@@ -30,6 +29,7 @@ use crate::mock::{DatanodeInstance, TableScanPlan};
use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef};
use crate::spliter::WriteSpliter;
use crate::table::route::TableRoutes;
pub mod insert;
#[derive(Clone)]
pub struct DistTable {

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec;
use api::v1::codec::InsertBatch;
use api::v1::insert_expr;
use api::v1::insert_expr::Expr;
use api::v1::Column;
@@ -73,12 +74,12 @@ impl DistTable {
}
}
fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result<InsertExpr> {
pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<InsertBatch> {
let mut row_count = None;
let columns = insert
.columns_values
.into_iter()
.iter()
.map(|(column_name, vector)| {
match row_count {
Some(rows) => ensure!(
@@ -97,12 +98,12 @@ fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result<Inse
.context(error::ColumnDataTypeSnafu)?;
let mut column = Column {
column_name,
column_name: column_name.clone(),
datatype: datatype.datatype() as i32,
..Default::default()
};
column.push_vals(0, vector);
column.push_vals(0, vector.clone());
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
@@ -111,32 +112,28 @@ fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result<Inse
columns,
row_count: row_count.map(|rows| rows as u32).unwrap_or(0),
};
Ok(insert_batch)
}
let mut options = HashMap::with_capacity(1);
options.insert(
// TODO(fys): Temporarily hard code here
"region_id".to_string(),
codec::RegionNumber { id: region_id }.into(),
);
fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result<InsertExpr> {
let table_name = insert.table_name.clone();
let insert_batch = insert_request_to_insert_batch(&insert)?;
Ok(InsertExpr {
table_name: insert.table_name,
options,
schema_name: insert.schema_name,
table_name,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![insert_batch.into()],
})),
region_number,
options: Default::default(),
})
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, ops::Deref};
use std::collections::HashMap;
use api::v1::{
codec::{self, InsertBatch},
insert_expr::Expr,
ColumnDataType, InsertExpr,
};
use api::v1::{codec::InsertBatch, insert_expr::Expr, ColumnDataType, InsertExpr};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder};
use table::requests::InsertRequest;
@@ -205,8 +202,7 @@ mod tests {
}
}
let bytes = insert_expr.options.get("region_id").unwrap();
let region_id: codec::RegionNumber = bytes.deref().try_into().unwrap();
assert_eq!(12, region_id.id);
let region_number = insert_expr.region_number;
assert_eq!(12, region_number);
}
}

View File

@@ -19,9 +19,13 @@ async fn create_datanode_instance() -> Arc<DatanodeInstance> {
}
pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let datanode_instance = create_datanode_instance().await;
let datanode_instance: Arc<DatanodeInstance> = create_datanode_instance().await;
let dn_catalog_manager = datanode_instance.catalog_manager().clone();
let (_, client) = create_datanode_client(datanode_instance).await;
Arc::new(Instance::with_client(client))
Arc::new(Instance::with_client_and_catalog_manager(
client,
dn_catalog_manager,
))
}
pub(crate) async fn create_datanode_client(

View File

@@ -4,6 +4,7 @@ use api::v1::{
insert_expr::{self, Expr},
InsertExpr,
};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -80,6 +81,9 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
// InfluxDB uses default catalog name and schema name
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
@@ -150,11 +154,13 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
Ok(writers
.into_iter()
.map(|(table_name, writer)| InsertExpr {
schema_name: schema_name.clone(),
table_name,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![writer.finish().into()],
})),
options: HashMap::default(),
region_number: 0,
})
.collect())
}

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use table::requests::InsertRequest;
@@ -133,6 +134,7 @@ impl DataPoint {
// TODO(fys): will remove in the future.
pub fn as_grpc_insert(&self) -> InsertExpr {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let mut columns = Vec::with_capacity(2 + self.tags.len());
let ts_column = Column {
@@ -177,11 +179,13 @@ impl DataPoint {
row_count: 1,
};
InsertExpr {
schema_name,
table_name: self.metric.clone(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![batch.into()],
})),
options: HashMap::default(),
region_number: 0,
}
}

View File

@@ -11,6 +11,7 @@ use api::v1::{
codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType,
InsertExpr,
};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision::MILLISECOND;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
@@ -339,6 +340,8 @@ pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result<Vec<In
// TODO(fys): it will remove in the future.
fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
@@ -399,6 +402,7 @@ fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
row_count: row_count as u32,
};
Ok(InsertExpr {
schema_name,
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
@@ -407,6 +411,7 @@ fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
values: vec![batch.into()],
})),
options: HashMap::default(),
region_number: 0,
})
}

View File

@@ -1,4 +1,4 @@
use api::v1::{alter_expr, AlterExpr};
use api::v1::{alter_expr, AddColumn, AlterExpr};
use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint};
use crate::error::UnsupportedAlterTableStatementSnafu;
@@ -51,8 +51,11 @@ impl TryFrom<AlterTable> for AlterExpr {
.fail();
}
AlterTableOperation::AddColumn { column_def } => {
alter_expr::Kind::AddColumn(api::v1::AddColumn {
column_def: Some(sql_column_def_to_grpc_column_def(column_def)?),
alter_expr::Kind::AddColumns(api::v1::AddColumns {
add_columns: vec![AddColumn {
column_def: Some(sql_column_def_to_grpc_column_def(column_def)?),
is_key: false,
}],
})
}
};

View File

@@ -11,7 +11,7 @@ use common_query::physical_plan::PhysicalPlanRef;
use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};
use crate::requests::{AlterTableRequest, InsertRequest};
/// Table abstraction.
@@ -61,3 +61,10 @@ pub trait Table: Send + Sync {
}
pub type TableRef = Arc<dyn Table>;
#[async_trait::async_trait]
pub trait TableIdProvider {
async fn next_table_id(&self) -> Result<TableId>;
}
pub type TableIdProviderRef = Arc<dyn TableIdProvider + Send + Sync>;