diff --git a/Cargo.lock b/Cargo.lock index 6a209e795c..453689ebc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3866,7 +3866,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=783682fabc38c57b5b9d46bdcfeebe2496e85bbb#783682fabc38c57b5b9d46bdcfeebe2496e85bbb" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2c14c6e22dfe957f40bb88dd01fb8530656de89b#2c14c6e22dfe957f40bb88dd01fb8530656de89b" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 78b89a5e17..1e938f58a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "783682fabc38c57b5b9d46bdcfeebe2496e85bbb" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c14c6e22dfe957f40bb88dd01fb8530656de89b" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index e503555dd2..f219dd264e 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::datanode_manager::{Datanode, DatanodeManager}; +use common_meta::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; @@ -44,12 +44,17 @@ impl Debug for DatanodeClients { } #[async_trait::async_trait] -impl DatanodeManager for DatanodeClients { - async fn datanode(&self, datanode: &Peer) -> Arc { +impl NodeManager for DatanodeClients { + async fn datanode(&self, datanode: &Peer) -> DatanodeRef { let client = self.get_client(datanode).await; Arc::new(RegionRequester::new(client)) } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + // TODO(weny): Support it. + unimplemented!() + } } impl DatanodeClients { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 2c7d4abad2..7148606a09 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,7 +21,7 @@ use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::ddl_manager::DdlManager; @@ -468,7 +468,7 @@ impl StartCommand { pub async fn create_ddl_task_executor( table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 6eebf00f4b..fcf4486b46 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -70,6 +70,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result { AlterKind::RenameTable { new_table_name } } + Kind::ChangeColumnTypes(_) => unimplemented!(), }; let request = AlterTableRequest { diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 58990ce01d..f990bef330 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; +use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest}; use api::v1::region::{QueryRequest, RegionRequest}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -34,11 +35,24 @@ pub trait Datanode: Send + Sync { pub type DatanodeRef = Arc; -/// Datanode manager +/// The trait for handling requests to flownode #[async_trait::async_trait] -pub trait DatanodeManager: Send + Sync { - /// Retrieves a target `datanode`. - async fn datanode(&self, datanode: &Peer) -> DatanodeRef; +pub trait Flownode: Send + Sync { + async fn handle(&self, request: FlowRequest) -> Result; + + async fn handle_insert(&self, request: InsertRequest) -> Result; } -pub type DatanodeManagerRef = Arc; +pub type FlownodeRef = Arc; + +/// Datanode manager +#[async_trait::async_trait] +pub trait NodeManager: Send + Sync { + /// Retrieves a target `datanode`. + async fn datanode(&self, node: &Peer) -> DatanodeRef; + + /// Retrieves a target `flownode`. + async fn flownode(&self, node: &Peer) -> FlownodeRef; +} + +pub type NodeManagerRef = Arc; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index c763d33908..c7abac1d65 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -20,7 +20,7 @@ use store_api::storage::{RegionNumber, TableId}; use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; -use crate::datanode_manager::DatanodeManagerRef; +use crate::datanode_manager::NodeManagerRef; use crate::error::Result; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; @@ -95,7 +95,7 @@ pub struct TableMetadata { #[derive(Clone)] pub struct DdlContext { - pub datanode_manager: DatanodeManagerRef, + pub datanode_manager: NodeManagerRef, pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, pub memory_region_keeper: MemoryRegionKeeperRef, diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index cdebc25cbc..ce82ef831d 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -105,6 +105,7 @@ fn create_proto_alter_kind( }))) } Kind::RenameTable(_) => Ok(None), + Kind::ChangeColumnTypes(_) => unimplemented!(), } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1577cab2dc..7201f9602d 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -23,7 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; use crate::cache_invalidator::CacheInvalidatorRef; -use crate::datanode_manager::DatanodeManagerRef; +use crate::datanode_manager::NodeManagerRef; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; @@ -64,7 +64,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade /// The [DdlManager] provides the ability to execute Ddl. pub struct DdlManager { procedure_manager: ProcedureManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, @@ -75,7 +75,7 @@ pub struct DdlManager { impl DdlManager { pub fn try_new( procedure_manager: ProcedureManagerRef, - datanode_clients: DatanodeManagerRef, + datanode_clients: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, @@ -716,7 +716,7 @@ mod tests { use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; - use crate::datanode_manager::{DatanodeManager, DatanodeRef}; + use crate::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager}; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; @@ -734,10 +734,14 @@ mod tests { pub struct DummyDatanodeManager; #[async_trait::async_trait] - impl DatanodeManager for DummyDatanodeManager { + impl NodeManager for DummyDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { unimplemented!() } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + unimplemented!() + } } #[test] diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 4ed396509a..c4c64445d7 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -20,7 +20,7 @@ pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; -use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef}; +use crate::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef}; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; @@ -74,24 +74,28 @@ impl Datanode for MockDatanode { } #[async_trait::async_trait] -impl DatanodeManager for MockDatanodeManager { +impl NodeManager for MockDatanodeManager { async fn datanode(&self, peer: &Peer) -> DatanodeRef { Arc::new(MockDatanode { peer: peer.clone(), handler: self.handler.clone(), }) } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + unimplemented!() + } } /// Returns a test purpose [DdlContext]. -pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { +pub fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); new_ddl_context_with_kv_backend(datanode_manager, kv_backend) } /// Returns a test purpose [DdlContext] with a specified [KvBackendRef]. pub fn new_ddl_context_with_kv_backend( - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, kv_backend: KvBackendRef, ) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 1464a11c27..2f39a0dd67 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; @@ -42,7 +42,7 @@ pub struct FrontendBuilder { kv_backend: KvBackendRef, cache_invalidator: Option, catalog_manager: CatalogManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, plugins: Option, procedure_executor: ProcedureExecutorRef, heartbeat_task: Option, @@ -52,7 +52,7 @@ impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, catalog_manager: CatalogManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index 844b9a7735..a6c21e3503 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; @@ -29,13 +29,13 @@ use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct FrontendRegionQueryHandler { partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } impl FrontendRegionQueryHandler { pub fn arc( partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Arc { Arc::new(Self { partition_manager, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4001c4e59d..fcb1bd61e4 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -19,7 +19,7 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionRespo use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; -use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef}; +use common_meta::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; @@ -34,10 +34,14 @@ use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; pub struct StandaloneDatanodeManager(pub RegionServer); #[async_trait] -impl DatanodeManager for StandaloneDatanodeManager { +impl NodeManager for StandaloneDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { RegionInvoker::arc(self.0.clone()) } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + unimplemented!() + } } /// Relative to [client::region::RegionRequester] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index bd4ab0394d..ea8613db4f 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,7 +20,7 @@ use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; @@ -79,7 +79,7 @@ pub struct MetasrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - datanode_manager: Option, + datanode_manager: Option, plugins: Option, table_metadata_allocator: Option, } @@ -141,7 +141,7 @@ impl MetasrvBuilder { self } - pub fn datanode_manager(mut self, datanode_manager: DatanodeManagerRef) -> Self { + pub fn datanode_manager(mut self, datanode_manager: NodeManagerRef) -> Self { self.datanode_manager = Some(datanode_manager); self } @@ -392,7 +392,7 @@ fn build_procedure_manager( fn build_ddl_manager( options: &MetasrvOptions, - datanode_clients: Option, + datanode_clients: Option, procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 03ddc2dbde..ce2e5cda4d 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -21,7 +21,7 @@ use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; use common_meta::ddl::test_util::columns::TestColumnDefBuilder; @@ -173,7 +173,7 @@ fn test_region_request_builder() { async fn new_datanode_manager( region_server: &EchoRegionServer, region_routes: &[RegionRoute], -) -> DatanodeManagerRef { +) -> NodeManagerRef { let clients = DatanodeClients::default(); let datanodes = find_leaders(region_routes); diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index e14aa0fbc1..a79138e7a7 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,7 +105,7 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; - use common_meta::datanode_manager::DatanodeManagerRef; + use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; @@ -188,7 +188,7 @@ pub mod test_data { } } - pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { + pub(crate) fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); let mailbox_sequence = diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 46f235123c..309edf4146 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -19,7 +19,7 @@ use std::{iter, mem}; use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader}; use api::v1::{DeleteRequests, RowDeleteRequests}; use catalog::CatalogManagerRef; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; use common_telemetry::tracing_context::TracingContext; @@ -40,7 +40,7 @@ use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion}; pub struct Deleter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type DeleterRef = Arc; @@ -49,7 +49,7 @@ impl Deleter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 61d2d2cfcf..95ff84f13a 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -25,7 +25,7 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; @@ -57,7 +57,7 @@ use crate::statement::StatementExecutor; pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type InserterRef = Arc; @@ -66,7 +66,7 @@ impl Inserter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 7cfd5a8a1a..768a2850aa 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -18,7 +18,7 @@ use api::v1::region::region_request::Body as RegionRequestBody; use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; use catalog::CatalogManagerRef; use common_catalog::build_db_string; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_telemetry::logging::{error, info}; use common_telemetry::tracing_context::TracingContext; @@ -39,7 +39,7 @@ use crate::region_req_factory::RegionRequestFactory; pub struct Requester { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type RequesterRef = Arc; @@ -48,7 +48,7 @@ impl Requester { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 687db126f0..58210e45c2 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -461,6 +461,9 @@ impl TryFrom for AlterKind { let names = x.drop_columns.into_iter().map(|x| x.name).collect(); AlterKind::DropColumns { names } } + alter_request::Kind::ChangeColumnTypes(_) => { + unimplemented!() + } }; Ok(alter_kind)