mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat: support multi table engines in distributed mode (#1316)
* chore: bump greptime-proto to 59afacd * feat: support multi table engines in distributed mode
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3198,7 +3198,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d3861c34f7920238869d0d4e50dc1e6b189d2a6b#d3861c34f7920238869d0d4e50dc1e6b189d2a6b"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=59afacdae59eae4241cfaf851021361caaeaed21#59afacdae59eae4241cfaf851021361caaeaed21"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"tonic",
|
||||
|
||||
@@ -366,6 +366,7 @@ fn create_table_expr() -> CreateTableExpr {
|
||||
table_options: Default::default(),
|
||||
region_ids: vec![0],
|
||||
table_id: None,
|
||||
engine: "mito".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-time = { path = "../common/time" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d3861c34f7920238869d0d4e50dc1e6b189d2a6b" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "59afacdae59eae4241cfaf851021361caaeaed21" }
|
||||
prost.workspace = true
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
|
||||
use client::{Client, Database};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use prost::Message;
|
||||
use substrait_proto::proto::plan_rel::RelType as PlanRelType;
|
||||
use substrait_proto::proto::read_rel::{NamedTable, ReadType};
|
||||
@@ -64,6 +64,7 @@ async fn run() {
|
||||
table_options: Default::default(),
|
||||
table_id: Some(TableId { id: 1024 }),
|
||||
region_ids: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
|
||||
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
@@ -178,8 +178,7 @@ pub fn create_expr_to_request(
|
||||
primary_key_indices,
|
||||
create_if_not_exists: expr.create_if_not_exists,
|
||||
table_options,
|
||||
// TODO(weny): remove hard-code
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
engine: expr.engine,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -195,6 +195,7 @@ pub fn build_create_expr_from_insertion(
|
||||
table_id: Option<TableId>,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
engine: &str,
|
||||
) -> Result<CreateTableExpr> {
|
||||
let mut new_columns: HashSet<String> = HashSet::default();
|
||||
let mut column_defs = Vec::default();
|
||||
@@ -256,6 +257,7 @@ pub fn build_create_expr_from_insertion(
|
||||
table_options: Default::default(),
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
|
||||
Ok(expr)
|
||||
@@ -455,6 +457,7 @@ mod tests {
|
||||
use api::v1::column::{self, SemanticType, Values};
|
||||
use api::v1::{Column, ColumnDataType};
|
||||
use common_base::BitVec;
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_query::physical_plan::PhysicalPlanRef;
|
||||
use common_query::prelude::Expr;
|
||||
use common_time::timestamp::Timestamp;
|
||||
@@ -493,13 +496,22 @@ mod tests {
|
||||
let table_id = Some(10);
|
||||
let table_name = "test_metric";
|
||||
|
||||
assert!(build_create_expr_from_insertion("", "", table_id, table_name, &[]).is_err());
|
||||
assert!(
|
||||
build_create_expr_from_insertion("", "", table_id, table_name, &[], MITO_ENGINE)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
let insert_batch = mock_insert_batch();
|
||||
|
||||
let create_expr =
|
||||
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
|
||||
.unwrap();
|
||||
let create_expr = build_create_expr_from_insertion(
|
||||
"",
|
||||
"",
|
||||
table_id,
|
||||
table_name,
|
||||
&insert_batch.0,
|
||||
MITO_ENGINE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
|
||||
assert_eq!(table_name, create_expr.table_name);
|
||||
|
||||
@@ -159,6 +159,7 @@ mod test {
|
||||
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
|
||||
CreateDatabaseExpr, CreateTableExpr, QueryRequest,
|
||||
};
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::prelude::*;
|
||||
use query::parser::QueryLanguageParser;
|
||||
@@ -213,6 +214,7 @@ mod test {
|
||||
},
|
||||
],
|
||||
time_index: "ts".to_string(),
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
});
|
||||
|
||||
@@ -106,7 +106,7 @@ impl Instance {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::{column_def, ColumnDataType, ColumnDef, TableId};
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
|
||||
use common_grpc_expr::create_table_schema;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
|
||||
@@ -237,6 +237,7 @@ mod tests {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -193,6 +193,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
table_options: (&request.table_options).into(),
|
||||
table_id: None, // Should and will be assigned by Meta.
|
||||
region_ids: vec![0],
|
||||
engine: request.engine,
|
||||
};
|
||||
|
||||
let table = dist_instance
|
||||
|
||||
@@ -43,6 +43,7 @@ pub trait CreateExprFactory {
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
engine: &str,
|
||||
) -> crate::error::Result<CreateTableExpr>;
|
||||
}
|
||||
|
||||
@@ -57,6 +58,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
engine: &str,
|
||||
) -> Result<CreateTableExpr> {
|
||||
let table_id = None;
|
||||
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
|
||||
@@ -65,6 +67,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
|
||||
table_id,
|
||||
table_name,
|
||||
columns,
|
||||
engine,
|
||||
)
|
||||
.context(BuildCreateExprOnInsertionSnafu)?;
|
||||
|
||||
@@ -96,6 +99,7 @@ pub(crate) fn create_to_expr(
|
||||
table_options,
|
||||
table_id: None,
|
||||
region_ids: vec![],
|
||||
engine: create.engine.to_string(),
|
||||
};
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use async_trait::async_trait;
|
||||
use catalog::remote::MetaKvBackend;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_query::Output;
|
||||
@@ -298,7 +299,7 @@ impl Instance {
|
||||
"Table {}.{}.{} does not exist, try create table",
|
||||
catalog_name, schema_name, table_name,
|
||||
);
|
||||
self.create_table_by_columns(ctx, table_name, columns)
|
||||
self.create_table_by_columns(ctx, table_name, columns, MITO_ENGINE)
|
||||
.await?;
|
||||
info!(
|
||||
"Successfully created table on insertion: {}.{}.{}",
|
||||
@@ -335,6 +336,7 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
engine: &str,
|
||||
) -> Result<Output> {
|
||||
let catalog_name = &ctx.current_catalog();
|
||||
let schema_name = &ctx.current_schema();
|
||||
@@ -342,7 +344,7 @@ impl Instance {
|
||||
// Create table automatically, build schema from data.
|
||||
let create_expr = self
|
||||
.create_expr_factory
|
||||
.create_expr_by_columns(catalog_name, schema_name, table_name, columns)
|
||||
.create_expr_by_columns(catalog_name, schema_name, table_name, columns, engine)
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
|
||||
@@ -95,6 +95,7 @@ mod test {
|
||||
InsertRequest, QueryRequest,
|
||||
};
|
||||
use catalog::helper::{TableGlobalKey, TableGlobalValue};
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use query::parser::QueryLanguageParser;
|
||||
@@ -162,6 +163,7 @@ mod test {
|
||||
},
|
||||
],
|
||||
time_index: "ts".to_string(),
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
});
|
||||
|
||||
@@ -18,7 +18,7 @@ use api::v1::{
|
||||
InsertRequest, TableId,
|
||||
};
|
||||
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
|
||||
use common_query::Output;
|
||||
use servers::server::Server;
|
||||
use tests_integration::test_util::{setup_grpc_server, StorageType};
|
||||
@@ -304,6 +304,7 @@ fn testing_create_expr() -> CreateTableExpr {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user