diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9b071db7ea..b6c6e4baee 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -16,52 +16,40 @@ //! and communicating with other parts of the database #![warn(unused_imports)] -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use api::v1::flow::flow_server::Flow; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use catalog::kvbackend::KvBackendCatalogManager; -use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; use common_frontend::handler::FrontendInvoker; -use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; -use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_runtime::JoinHandle; use common_telemetry::{debug, info}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; -use hydroflow::scheduled::graph::Hydroflow; use itertools::Itertools; use minstant::Anchor; -use prost::bytes::buf; use query::{QueryEngine, QueryEngineFactory}; use serde::{Deserialize, Serialize}; use session::context::QueryContext; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; -use tokio::task::LocalSet; +use tokio::sync::{oneshot, Mutex, RwLock}; -use crate::adapter::error::{ - EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, -}; -pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap}; -use crate::adapter::parse_expr::{parse_duration, parse_fixed}; +use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +pub(crate) use crate::adapter::node_context::FlownodeContext; +use crate::adapter::parse_expr::parse_fixed; use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; -use crate::compute::{Context, DataflowState, ErrCollector}; -use crate::expr::error::InternalSnafu; +use crate::compute::ErrCollector; use crate::expr::GlobalId; -use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; +use crate::repr::{self, DiffRow, Row}; use crate::transform::sql_to_flow_plan; pub(crate) mod error; @@ -73,7 +61,7 @@ mod tests; mod util; mod worker; -mod node_context; +pub(crate) mod node_context; mod table_source; use error::Error; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 4049c40289..08cd80a2a3 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -20,9 +20,7 @@ use common_error::ext::BoxedError; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; -use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; -use serde::{Deserialize, Serialize}; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index f5f32e4df0..94906898c6 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -16,7 +16,7 @@ use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; -use common_error::ext::{BoxedError, ErrorExt, StackError}; +use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; use itertools::Itertools; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 46d710eca7..8da6b747ac 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -17,48 +17,16 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; -use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; -use catalog::kvbackend::KvBackendCatalogManager; -use catalog::memory::MemoryCatalogManager; -use common_base::Plugins; -use common_error::ext::BoxedError; -use common_frontend::handler::FrontendInvoker; -use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; -use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; -use common_meta::kv_backend::KvBackendRef; -use common_runtime::JoinHandle; -use common_telemetry::info; -use datatypes::schema::ColumnSchema; -use datatypes::value::Value; -use greptime_proto::v1; -use hydroflow::scheduled::graph::Hydroflow; -use itertools::Itertools; -use minstant::Anchor; -use prost::bytes::buf; -use query::{QueryEngine, QueryEngineFactory}; -use serde::{Deserialize, Serialize}; use session::context::QueryContext; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; -use tokio::task::LocalSet; +use tokio::sync::{broadcast, mpsc}; -use crate::adapter::error::{ - Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, -}; -use crate::adapter::parse_expr::{parse_duration, parse_fixed}; -use crate::adapter::util::column_schemas_to_proto; -use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; -use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; -use crate::transform::sql_to_flow_plan; +use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; /// A context that holds the information of the dataflow #[derive(Default)] diff --git a/src/flow/src/adapter/parse_expr.rs b/src/flow/src/adapter/parse_expr.rs index acd2880584..3a28e813d5 100644 --- a/src/flow/src/adapter/parse_expr.rs +++ b/src/flow/src/adapter/parse_expr.rs @@ -16,12 +16,11 @@ use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; -use nom::character::complete::{alphanumeric1, digit0, multispace0, multispace1}; +use nom::character::complete::{alphanumeric1, digit0, multispace0}; use nom::combinator::peek; use nom::sequence::tuple; -use nom::{multi, IResult}; +use nom::IResult; -use crate::expr::ScalarExpr; use crate::repr; #[test] diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index a6363057d1..c0d0854572 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -15,26 +15,20 @@ //! Implementation of grpc service for flow node use std::net::SocketAddr; -use std::sync::Arc; -use api::v1::flow::{CreateRequest, DropRequest}; use common_meta::node_manager::Flownode; use common_telemetry::tracing::info; use futures::FutureExt; -use greptime_proto::v1::flow::{ - flow_request, flow_server, FlowRequest, FlowResponse, InsertRequests, -}; +use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use snafu::{ensure, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{oneshot, Mutex}; use tonic::transport::server::TcpIncoming; -use tonic::transport::Server; use tonic::{Request, Response, Status}; -use crate::adapter::{FlownodeManager, FlownodeManagerRef}; -use crate::repr::{self, DiffRow}; +use crate::adapter::FlownodeManagerRef; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index f69daefed4..5d85664b1d 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -14,52 +14,15 @@ //! How to query table information from database -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; -use std::sync::Arc; - -use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; -use catalog::kvbackend::KvBackendCatalogManager; -use catalog::memory::MemoryCatalogManager; -use common_base::Plugins; -use common_error::ext::BoxedError; -use common_frontend::handler::FrontendInvoker; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; -use common_meta::kv_backend::KvBackendRef; -use common_runtime::JoinHandle; -use common_telemetry::info; -use datatypes::schema::ColumnSchema; -use datatypes::value::Value; -use greptime_proto::v1; -use hydroflow::scheduled::graph::Hydroflow; use itertools::Itertools; -use minstant::Anchor; -use prost::bytes::buf; -use query::{QueryEngine, QueryEngineFactory}; -use serde::{Deserialize, Serialize}; -use session::context::QueryContext; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; -use tokio::task::LocalSet; -use crate::adapter::error::{ - Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, -}; -pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap}; -use crate::adapter::parse_expr::{parse_duration, parse_fixed}; -use crate::adapter::util::column_schemas_to_proto; -use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::adapter::error::{Error, TableNotFoundMetaSnafu, TableNotFoundSnafu}; use crate::adapter::TableName; -use crate::compute::{Context, DataflowState, ErrCollector}; -use crate::expr::error::InternalSnafu; -use crate::expr::GlobalId; -use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; -use crate::transform::sql_to_flow_plan; +use crate::repr::{self, ColumnType, RelationType}; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { diff --git a/src/flow/src/adapter/tests.rs b/src/flow/src/adapter/tests.rs index 01206fb46b..4690ff54f0 100644 --- a/src/flow/src/adapter/tests.rs +++ b/src/flow/src/adapter/tests.rs @@ -15,12 +15,9 @@ //! Mock test for adapter module //! TODO(discord9): write mock test -use common_meta::key::table_info::TableInfoKey; -use common_meta::kv_backend::memory::MemoryKvBackend; -use common_meta::kv_backend::{KvBackend, KvBackendRef}; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use store_api::storage::ConcreteDataType; -use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; use super::*; diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f1997c70e0..66d8e946f3 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -15,7 +15,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; use common_error::ext::BoxedError; -use datatypes::schema::{ColumnSchema, COMMENT_KEY}; +use datatypes::schema::ColumnSchema; use itertools::Itertools; use snafu::ResultExt; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index db881295d1..6848dbc6af 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -14,17 +14,15 @@ //! For single-thread flow worker -use std::borrow::BorrowMut; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use common_telemetry::info; use hydroflow::scheduled::graph::Hydroflow; use snafu::ResultExt; use tokio::sync::{broadcast, mpsc, Mutex}; use crate::adapter::error::{Error, EvalSnafu}; -use crate::adapter::{FlowId, FlowTickManager}; +use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; @@ -462,6 +460,7 @@ mod test { use tokio::sync::oneshot; use super::*; + use crate::adapter::FlowTickManager; use crate::expr::Id; use crate::plan::Plan; use crate::repr::{RelationType, Row}; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index bfdf50ce1b..5c85fa8f2b 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -120,6 +120,10 @@ pub async fn sql_to_flow_plan( Ok(flow_plan) } +fn rewrite_to_prop_ts(plan: TypedPlan) -> TypedPlan { + todo!() +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -136,7 +140,7 @@ mod test { use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; - use crate::adapter::IdToNameMap; + use crate::adapter::node_context::IdToNameMap; use crate::repr::ColumnType; pub fn create_test_ctx() -> FlownodeContext { diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 5010684792..c8bff7da5c 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -14,7 +14,6 @@ #![warn(unused_imports)] -use common_telemetry::info; use datatypes::data_type::ConcreteDataType as CDT; use itertools::Itertools; use snafu::{OptionExt, ResultExt};