diff --git a/Cargo.lock b/Cargo.lock index 3223760fdc..f4e5094619 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1261,6 +1261,7 @@ dependencies = [ "moka 0.11.3", "object-store", "parking_lot", + "partition", "regex", "serde", "serde_json", @@ -6164,6 +6165,10 @@ dependencies = [ "thiserror", ] +[[package]] +name = "operator" +version = "0.4.0-nightly" + [[package]] name = "optional" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index b377eef2f5..88be3cb968 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "src/mito", "src/mito2", "src/object-store", + "src/operator", "src/partition", "src/promql", "src/query", @@ -148,6 +149,7 @@ meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } mito = { path = "src/mito" } mito2 = { path = "src/mito2" } +operator = { path = "src/operator" } object-store = { path = "src/object-store" } partition = { path = "src/partition" } promql = { path = "src/promql" } diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index c859e599d1..a1bcd4e322 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -32,6 +32,7 @@ meta-client = { workspace = true } metrics.workspace = true moka = { version = "0.11", features = ["future"] } parking_lot = "0.12" +partition.workspace = true regex.workspace = true serde.workspace = true serde_json = "1.0" diff --git a/src/catalog/src/remote.rs b/src/catalog/src/kvbackend.rs similarity index 92% rename from src/catalog/src/remote.rs rename to src/catalog/src/kvbackend.rs index 802cb06a20..ecc1b72b48 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/kvbackend.rs @@ -17,10 +17,13 @@ use std::sync::Arc; pub use client::{CachedMetaKvBackend, MetaKvBackend}; mod client; +mod manager; #[cfg(feature = "testing")] pub mod mock; +pub use manager::KvBackendCatalogManager; +/// KvBackend cache invalidator #[async_trait::async_trait] pub trait KvCacheInvalidator: Send + Sync { async fn invalidate_key(&self, key: &[u8]); diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/kvbackend/client.rs similarity index 100% rename from src/catalog/src/remote/client.rs rename to src/catalog/src/kvbackend/client.rs diff --git a/src/frontend/src/catalog.rs b/src/catalog/src/kvbackend/manager.rs similarity index 96% rename from src/frontend/src/catalog.rs rename to src/catalog/src/kvbackend/manager.rs index 4bdf6f8dab..ef44d0de4f 100644 --- a/src/frontend/src/catalog.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -16,13 +16,6 @@ use std::any::Any; use std::collections::BTreeSet; use std::sync::{Arc, Weak}; -use catalog::error::{ - self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, - TableMetadataManagerSnafu, -}; -use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; -use catalog::remote::KvCacheInvalidatorRef; -use catalog::CatalogManager; use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; @@ -40,11 +33,18 @@ use common_telemetry::debug; use futures_util::TryStreamExt; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; +use table::dist_table::DistTable; use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; -use crate::table::DistTable; +use crate::error::{ + self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, + TableMetadataManagerSnafu, +}; +use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; +use crate::kvbackend::KvCacheInvalidatorRef; +use crate::CatalogManager; /// Access all existing catalog, schema and tables. /// @@ -52,7 +52,7 @@ use crate::table::DistTable; /// a kv-backend which persists the metadata of a table. And system tables /// comes from [SystemCatalog], which is static and read-only. #[derive(Clone)] -pub struct FrontendCatalogManager { +pub struct KvBackendCatalogManager { // TODO(LFC): Maybe use a real implementation for Standalone mode. // Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend // is implemented by RaftEngine. Maybe we need a cache for it? @@ -65,9 +65,10 @@ pub struct FrontendCatalogManager { } #[async_trait::async_trait] -impl CacheInvalidator for FrontendCatalogManager { +impl CacheInvalidator for KvBackendCatalogManager { async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> { let key: TableNameKey = (&table_name).into(); + self.backend_cache_invalidator .invalidate_key(&key.as_raw_key()) .await; @@ -102,7 +103,7 @@ impl CacheInvalidator for FrontendCatalogManager { } } -impl FrontendCatalogManager { +impl KvBackendCatalogManager { pub fn new( backend: KvBackendRef, backend_cache_invalidator: KvCacheInvalidatorRef, @@ -139,7 +140,7 @@ impl FrontendCatalogManager { } #[async_trait::async_trait] -impl CatalogManager for FrontendCatalogManager { +impl CatalogManager for KvBackendCatalogManager { async fn catalog_names(&self) -> CatalogResult> { let stream = self .table_metadata_manager @@ -278,7 +279,7 @@ impl CatalogManager for FrontendCatalogManager { /// - information_schema.columns #[derive(Clone)] struct SystemCatalog { - catalog_manager: Weak, + catalog_manager: Weak, } impl SystemCatalog { diff --git a/src/catalog/src/remote/mock.rs b/src/catalog/src/kvbackend/mock.rs similarity index 100% rename from src/catalog/src/remote/mock.rs rename to src/catalog/src/kvbackend/mock.rs diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 8f929a7dcc..fce1f05be6 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -31,9 +31,9 @@ use crate::error::Result; pub mod error; pub mod information_schema; -pub mod local; +pub mod kvbackend; +pub mod memory; mod metrics; -pub mod remote; pub mod system; pub mod table_source; diff --git a/src/catalog/src/local.rs b/src/catalog/src/memory.rs similarity index 87% rename from src/catalog/src/local.rs rename to src/catalog/src/memory.rs index 2a9f1258a2..f1216e80af 100644 --- a/src/catalog/src/local.rs +++ b/src/catalog/src/memory.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod memory; +pub mod manager; -pub use memory::{new_memory_catalog_manager, MemoryCatalogManager}; +pub use manager::{new_memory_catalog_manager, MemoryCatalogManager}; diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/memory/manager.rs similarity index 100% rename from src/catalog/src/local/memory.rs rename to src/catalog/src/memory/manager.rs diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index fbcac34795..96dd709ba5 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -123,7 +123,7 @@ mod tests { use session::context::QueryContext; use super::*; - use crate::local::MemoryCatalogManager; + use crate::memory::MemoryCatalogManager; #[test] fn test_validate_table_ref() { diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 579dc949e9..c1b619cf85 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -16,7 +16,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use catalog::remote::CachedMetaKvBackend; +use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; @@ -25,7 +25,6 @@ use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; use either::Either; -use frontend::catalog::FrontendCatalogManager; use meta_client::client::MetaClientBuilder; use query::datafusion::DatafusionQueryEngine; use query::logical_optimizer::LogicalOptimizer; @@ -253,7 +252,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { let datanode_clients = Arc::new(DatanodeClients::default()); - let catalog_list = FrontendCatalogManager::new( + let catalog_list = KvBackendCatalogManager::new( cached_meta_backend.clone(), cached_meta_backend.clone(), datanode_clients, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 117ccccfb3..da08703622 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use catalog::remote::DummyKvCacheInvalidator; +use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager}; use catalog::CatalogManagerRef; use clap::Parser; use common_base::Plugins; @@ -26,7 +26,6 @@ use common_telemetry::logging::LoggingOptions; use datanode::datanode::builder::DatanodeBuilder; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; use datanode::region_server::RegionServer; -use frontend::catalog::FrontendCatalogManager; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ @@ -313,7 +312,7 @@ impl StartCommand { .context(StartDatanodeSnafu)?; let region_server = datanode.region_server(); - let catalog_manager = FrontendCatalogManager::new( + let catalog_manager = KvBackendCatalogManager::new( kv_store.clone(), Arc::new(DummyKvCacheInvalidator), Arc::new(StandaloneDatanodeManager(region_server.clone())), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 135d751c01..b0c73c1c4c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -20,7 +20,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; -use catalog::local::MemoryCatalogManager; +use catalog::memory::MemoryCatalogManager; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_config::WalConfig; diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 81feab068d..5df9cb0a11 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -13,7 +13,7 @@ // limitations under the License. use async_trait::async_trait; -use catalog::remote::KvCacheInvalidatorRef; +use catalog::kvbackend::KvCacheInvalidatorRef; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 9c7ed815ca..abae860eff 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use api::v1::meta::HeartbeatResponse; -use catalog::remote::KvCacheInvalidator; +use catalog::kvbackend::KvCacheInvalidator; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index c61abd18f0..ddf433d9c5 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -27,7 +27,7 @@ use std::time::Duration; use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use catalog::remote::CachedMetaKvBackend; +use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use common_base::Plugins; @@ -80,7 +80,6 @@ pub use standalone::StandaloneDatanodeManager; use self::distributed::DistRegionRequestHandler; use self::standalone::StandaloneTableMetadataCreator; -use crate::catalog::FrontendCatalogManager; use crate::delete::{Deleter, DeleterRef}; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, @@ -151,7 +150,7 @@ impl Instance { ) -> Result { let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let catalog_manager = FrontendCatalogManager::new( + let catalog_manager = KvBackendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), datanode_clients.clone(), diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index bfe341b615..e9b591f744 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -15,7 +15,6 @@ #![feature(assert_matches)] #![feature(trait_upcasting)] -pub mod catalog; pub(crate) mod delete; pub mod error; pub mod expr_factory; @@ -31,5 +30,7 @@ mod server; pub mod service_config; pub mod statement; pub mod table; +#[cfg(test)] +pub(crate) mod tests; pub const MAX_VALUE: &str = "MAXVALUE"; diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/frontend/src/req_convert/delete/table_to_region.rs index 791125caf8..0b578c8697 100644 --- a/src/frontend/src/req_convert/delete/table_to_region.rs +++ b/src/frontend/src/req_convert/delete/table_to_region.rs @@ -65,7 +65,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::table::test::{create_partition_rule_manager, new_test_table_info}; + use crate::tests::{create_partition_rule_manager, new_test_table_info}; async fn prepare_mocked_backend() -> KvBackendRef { let backend = Arc::new(MemoryKvBackend::default()); diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs index 6592557d6d..fa7181eebb 100644 --- a/src/frontend/src/req_convert/insert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -65,7 +65,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::table::test::{create_partition_rule_manager, new_test_table_info}; + use crate::tests::{create_partition_rule_manager, new_test_table_info}; async fn prepare_mocked_backend() -> KvBackendRef { let backend = Arc::new(MemoryKvBackend::default()); diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index 08bc31073b..5de29d25d7 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -38,6 +38,7 @@ use sql::ast::Value as SqlValue; use sql::statements::alter::AlterTable; use sql::statements::create::{CreateExternalTable, CreateTable, Partitions}; use sql::statements::sql_value_to_value; +use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; @@ -48,7 +49,6 @@ use crate::error::{ DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; -use crate::table::DistTable; use crate::{expr_factory, MAX_VALUE}; impl StatementExecutor { diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 19e33ace23..240dd01032 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -12,45 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use common_error::ext::BoxedError; -use common_recordbatch::SendableRecordBatchStream; use session::context::QueryContextRef; use sqlparser::ast::ObjectName; -use store_api::data_source::DataSource; -use store_api::storage::ScanRequest; -use table::metadata::{FilterPushDownType, TableInfoRef}; -use table::thin_table::{ThinTable, ThinTableAdapter}; -use table::TableRef; -use crate::error::{InvalidSqlSnafu, NotSupportedSnafu, Result}; - -#[derive(Clone)] -pub struct DistTable; - -impl DistTable { - pub fn table(table_info: TableInfoRef) -> TableRef { - let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact); - let data_source = Arc::new(DummyDataSource); - Arc::new(ThinTableAdapter::new(thin_table, data_source)) - } -} - -pub struct DummyDataSource; - -impl DataSource for DummyDataSource { - fn get_stream( - &self, - _request: ScanRequest, - ) -> std::result::Result { - NotSupportedSnafu { - feat: "get stream from a distributed table", - } - .fail() - .map_err(BoxedError::new) - } -} +use crate::error::{InvalidSqlSnafu, Result}; // TODO(LFC): Refactor consideration: move this function to some helper mod, // could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. @@ -82,438 +47,3 @@ pub fn table_idents_to_full_name( }.fail(), } } - -#[cfg(test)] -pub(crate) mod test { - use std::collections::BTreeMap; - use std::sync::atomic::{AtomicU32, Ordering}; - - use catalog::remote::MetaKvBackend; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackendRef; - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use common_query::prelude::Expr; - use datafusion_expr::expr_fn::{and, binary_expr, col, or}; - use datafusion_expr::{lit, Operator}; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder}; - use meta_client::client::MetaClient; - use meter_core::collect::Collect; - use meter_core::data::{ReadRecord, WriteRecord}; - use meter_core::global::global_registry; - use meter_core::write_calc::WriteCalculator; - use partition::columns::RangeColumnsPartitionRule; - use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; - use partition::partition::{PartitionBound, PartitionDef}; - use partition::range::RangePartitionRule; - use partition::PartitionRuleRef; - use store_api::storage::RegionNumber; - use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; - use table::meter_insert_request; - use table::requests::InsertRequest; - - use super::*; - - pub fn new_test_table_info( - table_id: u32, - table_name: &str, - region_numbers: impl Iterator, - ) -> TableInfo { - let column_schemas = vec![ - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true), - ]; - let schema = SchemaBuilder::try_from(column_schemas) - .unwrap() - .version(123) - .build() - .unwrap(); - - let meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![0]) - .engine("engine") - .next_column_id(3) - .region_numbers(region_numbers.collect::>()) - .build() - .unwrap(); - TableInfoBuilder::default() - .table_id(table_id) - .table_version(5) - .name(table_name) - .meta(meta) - .build() - .unwrap() - } - - /// Create a partition rule manager with two tables, one is partitioned by single column, and - /// the other one is two. The tables are under default catalog and schema. - /// - /// Table named "one_column_partitioning_table" is partitioned by column "a" like this: - /// PARTITION BY RANGE (a) ( - /// PARTITION r1 VALUES LESS THAN (10), - /// PARTITION r2 VALUES LESS THAN (50), - /// PARTITION r3 VALUES LESS THAN (MAXVALUE), - /// ) - /// - /// Table named "two_column_partitioning_table" is partitioned by columns "a" and "b" like this: - /// PARTITION BY RANGE (a, b) ( - /// PARTITION r1 VALUES LESS THAN (10, 'hz'), - /// PARTITION r2 VALUES LESS THAN (50, 'sh'), - /// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), - /// ) - pub(crate) async fn create_partition_rule_manager( - kv_backend: KvBackendRef, - ) -> PartitionRuleManagerRef { - let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); - - table_metadata_manager - .create_table_metadata( - new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(), - vec![ - RegionRoute { - region: Region { - id: 3.into(), - name: "r1".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Value(10_i32.into())], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(3, "")), - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Value(50_i32.into())], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(2, "")), - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 1.into(), - name: "r3".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::MaxValue], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(1, "")), - follower_peers: vec![], - }, - ], - ) - .await - .unwrap(); - - table_metadata_manager - .create_table_metadata( - new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(), - vec![ - RegionRoute { - region: Region { - id: 1.into(), - name: "r1".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![ - PartitionBound::Value(10_i32.into()), - PartitionBound::Value("hz".into()), - ], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: None, - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![ - PartitionBound::Value(50_i32.into()), - PartitionBound::Value("sh".into()), - ], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: None, - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 3.into(), - name: "r3".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), - }, - leader_peer: None, - follower_peers: vec![], - }, - ], - ) - .await - .unwrap(); - - partition_manager - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_find_partition_rule() { - let partition_manager = - create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await; - - // "one_column_partitioning_table" has id 1 - let partition_rule = partition_manager - .find_table_partition_rule(1) - .await - .unwrap(); - let range_rule = partition_rule - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(range_rule.column_name(), "a"); - assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); - assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); - - // "two_column_partitioning_table" has table 2 - let partition_rule = partition_manager - .find_table_partition_rule(2) - .await - .unwrap(); - let range_columns_rule = partition_rule - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]); - assert_eq!( - range_columns_rule.value_lists(), - &vec![ - vec![ - PartitionBound::Value(10_i32.into()), - PartitionBound::Value("hz".into()), - ], - vec![ - PartitionBound::Value(50_i32.into()), - PartitionBound::Value("sh".into()), - ], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue] - ] - ); - assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_find_regions() { - let kv_backend = MetaKvBackend { - client: Arc::new(MetaClient::default()), - }; - let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(kv_backend))); - - // PARTITION BY RANGE (a) ( - // PARTITION r1 VALUES LESS THAN (10), - // PARTITION r2 VALUES LESS THAN (20), - // PARTITION r3 VALUES LESS THAN (50), - // PARTITION r4 VALUES LESS THAN (MAXVALUE), - // ) - let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( - "a", - vec![10_i32.into(), 20_i32.into(), 50_i32.into()], - vec![0_u32, 1, 2, 3], - )) as _; - - let partition_rule_clone = partition_rule.clone(); - let test = |filters: Vec, expect_regions: Vec| { - let mut regions = partition_manager - .find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice()) - .unwrap(); - regions.sort(); - assert_eq!(regions, expect_regions); - }; - - // test simple filter - test( - vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10 - vec![0], - ); - test( - vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10 - vec![0, 1], - ); - test( - vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a - vec![0, 1], - ); - test( - vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a - vec![0, 1, 2], - ); - test( - vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a - vec![2], - ); - test( - vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45 - vec![0, 1, 2, 3], - ); - test( - vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50 - vec![3], - ); - - // test multiple filters - test( - vec![ - binary_expr(col("a"), Operator::Gt, lit(10)).into(), - binary_expr(col("a"), Operator::Gt, lit(50)).into(), - ], // [a > 10, a > 50] - vec![3], - ); - - // test finding all regions when provided with not supported filters or not partition column - test( - vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123 - vec![0, 1, 2, 3], - ); - test( - vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789 - vec![0, 1, 2, 3], - ); - - // test complex "AND" or "OR" filters - test( - vec![and( - binary_expr(col("row_id"), Operator::Lt, lit(1)), - or( - binary_expr(col("row_id"), Operator::Lt, lit(1)), - binary_expr(col("a"), Operator::Lt, lit(1)), - ), - ) - .into()], // row_id < 1 OR (row_id < 1 AND a > 1) - vec![0, 1, 2, 3], - ); - test( - vec![or( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::GtEq, lit(20)), - ) - .into()], // a < 20 OR a >= 20 - vec![0, 1, 2, 3], - ); - test( - vec![and( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::Lt, lit(50)), - ) - .into()], // a < 20 AND a < 50 - vec![0, 1], - ); - - // test failed to find regions by contradictory filters - let regions = partition_manager.find_regions_by_filters( - partition_rule, - vec![and( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::GtEq, lit(20)), - ) - .into()] - .as_slice(), - ); // a < 20 AND a >= 20 - assert!(matches!( - regions.unwrap_err(), - partition::error::Error::FindRegions { .. } - )); - } - - #[derive(Default)] - struct MockCollector { - pub write_sum: AtomicU32, - } - - impl Collect for MockCollector { - fn on_write(&self, record: WriteRecord) { - let _ = self - .write_sum - .fetch_add(record.byte_count, Ordering::Relaxed); - } - - fn on_read(&self, _record: ReadRecord) { - todo!() - } - } - - struct MockCalculator; - - impl WriteCalculator for MockCalculator { - fn calc_byte(&self, _value: &InsertRequest) -> u32 { - 1024 * 10 - } - } - - #[test] - #[ignore] - fn test_meter_insert_request() { - let collector = Arc::new(MockCollector::default()); - global_registry().set_collector(collector.clone()); - global_registry().register_calculator(Arc::new(MockCalculator)); - - let req = InsertRequest { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "numbers".to_string(), - columns_values: Default::default(), - region_number: 0, - }; - meter_insert_request!(req); - - let re = collector.write_sum.load(Ordering::Relaxed); - assert_eq!(re, 1024 * 10); - } -} diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs new file mode 100644 index 0000000000..e124293beb --- /dev/null +++ b/src/frontend/src/tests.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod partition_manager; + +pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info}; diff --git a/src/frontend/src/tests/partition_manager.rs b/src/frontend/src/tests/partition_manager.rs new file mode 100644 index 0000000000..415ec0f5d6 --- /dev/null +++ b/src/frontend/src/tests/partition_manager.rs @@ -0,0 +1,444 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use catalog::kvbackend::MetaKvBackend; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_query::prelude::Expr; +use datafusion_expr::expr_fn::{and, binary_expr, col, or}; +use datafusion_expr::{lit, Operator}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use meta_client::client::MetaClient; +use meter_core::collect::Collect; +use meter_core::data::{ReadRecord, WriteRecord}; +use meter_core::global::global_registry; +use meter_core::write_calc::WriteCalculator; +use partition::columns::RangeColumnsPartitionRule; +use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; +use partition::partition::{PartitionBound, PartitionDef}; +use partition::range::RangePartitionRule; +use partition::PartitionRuleRef; +use store_api::storage::RegionNumber; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; +use table::meter_insert_request; +use table::requests::InsertRequest; + +pub fn new_test_table_info( + table_id: u32, + table_name: &str, + region_numbers: impl Iterator, +) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name(table_name) + .meta(meta) + .build() + .unwrap() +} + +/// Create a partition rule manager with two tables, one is partitioned by single column, and +/// the other one is two. The tables are under default catalog and schema. +/// +/// Table named "one_column_partitioning_table" is partitioned by column "a" like this: +/// PARTITION BY RANGE (a) ( +/// PARTITION r1 VALUES LESS THAN (10), +/// PARTITION r2 VALUES LESS THAN (50), +/// PARTITION r3 VALUES LESS THAN (MAXVALUE), +/// ) +/// +/// Table named "two_column_partitioning_table" is partitioned by columns "a" and "b" like this: +/// PARTITION BY RANGE (a, b) ( +/// PARTITION r1 VALUES LESS THAN (10, 'hz'), +/// PARTITION r2 VALUES LESS THAN (50, 'sh'), +/// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +/// ) +pub(crate) async fn create_partition_rule_manager( + kv_backend: KvBackendRef, +) -> PartitionRuleManagerRef { + let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); + + table_metadata_manager + .create_table_metadata( + new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(), + vec![ + RegionRoute { + region: Region { + id: 3.into(), + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(10_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(3, "")), + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(50_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(2, "")), + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 1.into(), + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(1, "")), + follower_peers: vec![], + }, + ], + ) + .await + .unwrap(); + + table_metadata_manager + .create_table_metadata( + new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(), + vec![ + RegionRoute { + region: Region { + id: 1.into(), + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(10_i32.into()), + PartitionBound::Value("hz".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(50_i32.into()), + PartitionBound::Value("sh".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 3.into(), + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![PartitionBound::MaxValue, PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + ], + ) + .await + .unwrap(); + + partition_manager +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_find_partition_rule() { + let partition_manager = + create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await; + + // "one_column_partitioning_table" has id 1 + let partition_rule = partition_manager + .find_table_partition_rule(1) + .await + .unwrap(); + let range_rule = partition_rule + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(range_rule.column_name(), "a"); + assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); + assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); + + // "two_column_partitioning_table" has table 2 + let partition_rule = partition_manager + .find_table_partition_rule(2) + .await + .unwrap(); + let range_columns_rule = partition_rule + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]); + assert_eq!( + range_columns_rule.value_lists(), + &vec![ + vec![ + PartitionBound::Value(10_i32.into()), + PartitionBound::Value("hz".into()), + ], + vec![ + PartitionBound::Value(50_i32.into()), + PartitionBound::Value("sh".into()), + ], + vec![PartitionBound::MaxValue, PartitionBound::MaxValue] + ] + ); + assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_find_regions() { + let kv_backend = MetaKvBackend { + client: Arc::new(MetaClient::default()), + }; + let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(kv_backend))); + + // PARTITION BY RANGE (a) ( + // PARTITION r1 VALUES LESS THAN (10), + // PARTITION r2 VALUES LESS THAN (20), + // PARTITION r3 VALUES LESS THAN (50), + // PARTITION r4 VALUES LESS THAN (MAXVALUE), + // ) + let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( + "a", + vec![10_i32.into(), 20_i32.into(), 50_i32.into()], + vec![0_u32, 1, 2, 3], + )) as _; + + let partition_rule_clone = partition_rule.clone(); + let test = |filters: Vec, expect_regions: Vec| { + let mut regions = partition_manager + .find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice()) + .unwrap(); + regions.sort(); + assert_eq!(regions, expect_regions); + }; + + // test simple filter + test( + vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10 + vec![0], + ); + test( + vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10 + vec![0, 1], + ); + test( + vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a + vec![0, 1], + ); + test( + vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a + vec![0, 1, 2], + ); + test( + vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a + vec![2], + ); + test( + vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45 + vec![0, 1, 2, 3], + ); + test( + vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50 + vec![3], + ); + + // test multiple filters + test( + vec![ + binary_expr(col("a"), Operator::Gt, lit(10)).into(), + binary_expr(col("a"), Operator::Gt, lit(50)).into(), + ], // [a > 10, a > 50] + vec![3], + ); + + // test finding all regions when provided with not supported filters or not partition column + test( + vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123 + vec![0, 1, 2, 3], + ); + test( + vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789 + vec![0, 1, 2, 3], + ); + + // test complex "AND" or "OR" filters + test( + vec![and( + binary_expr(col("row_id"), Operator::Lt, lit(1)), + or( + binary_expr(col("row_id"), Operator::Lt, lit(1)), + binary_expr(col("a"), Operator::Lt, lit(1)), + ), + ) + .into()], // row_id < 1 OR (row_id < 1 AND a > 1) + vec![0, 1, 2, 3], + ); + test( + vec![or( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::GtEq, lit(20)), + ) + .into()], // a < 20 OR a >= 20 + vec![0, 1, 2, 3], + ); + test( + vec![and( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::Lt, lit(50)), + ) + .into()], // a < 20 AND a < 50 + vec![0, 1], + ); + + // test failed to find regions by contradictory filters + let regions = partition_manager.find_regions_by_filters( + partition_rule, + vec![and( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::GtEq, lit(20)), + ) + .into()] + .as_slice(), + ); // a < 20 AND a >= 20 + assert!(matches!( + regions.unwrap_err(), + partition::error::Error::FindRegions { .. } + )); +} + +#[derive(Default)] +struct MockCollector { + pub write_sum: AtomicU32, +} + +impl Collect for MockCollector { + fn on_write(&self, record: WriteRecord) { + let _ = self + .write_sum + .fetch_add(record.byte_count, Ordering::Relaxed); + } + + fn on_read(&self, _record: ReadRecord) { + todo!() + } +} + +struct MockCalculator; + +impl WriteCalculator for MockCalculator { + fn calc_byte(&self, _value: &InsertRequest) -> u32 { + 1024 * 10 + } +} + +#[test] +#[ignore] +fn test_meter_insert_request() { + let collector = Arc::new(MockCollector::default()); + global_registry().set_collector(collector.clone()); + global_registry().register_calculator(Arc::new(MockCalculator)); + + let req = InsertRequest { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "numbers".to_string(), + columns_values: Default::default(), + region_number: 0, + }; + meter_insert_request!(req); + + let re = collector.write_sum.load(Ordering::Relaxed); + assert_eq!(re, 1024 * 10); +} diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml new file mode 100644 index 0000000000..ab346cc428 --- /dev/null +++ b/src/operator/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "operator" +version.workspace = true +edition.workspace = true +license.workspace = true + +[features] +testing = [] + +[dependencies] + +[dev-dependencies] diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs new file mode 100644 index 0000000000..59f3388c48 --- /dev/null +++ b/src/operator/src/lib.rs @@ -0,0 +1,13 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 118da45112..c7140659a8 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1375,7 +1375,7 @@ enum ScalarFunc { mod test { use std::time::{Duration, UNIX_EPOCH}; - use catalog::local::MemoryCatalogManager; + use catalog::memory::MemoryCatalogManager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index cb387807b2..1357897b58 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -507,7 +507,7 @@ mod tests { use crate::query_engine::{QueryEngineFactory, QueryEngineRef}; async fn create_test_engine() -> QueryEngineRef { - let catalog_manager = catalog::local::new_memory_catalog_manager().unwrap(); + let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap(); let req = RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 1d137706c0..3ac4bff86b 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -138,7 +138,7 @@ mod tests { #[test] fn test_query_engine_factory() { - let catalog_list = catalog::local::new_memory_catalog_manager().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); let factory = QueryEngineFactory::new(catalog_list, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index b4205f12c0..8842b6dd36 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -325,7 +325,7 @@ fn have_range_in_exprs(exprs: &Vec) -> bool { #[cfg(test)] mod test { - use catalog::local::MemoryCatalogManager; + use catalog::memory::MemoryCatalogManager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index f36083d1f8..4a60f925ce 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use catalog::local::MemoryCatalogManager; +use catalog::memory::MemoryCatalogManager; use common_query::Output; use common_recordbatch::{util, RecordBatch}; use session::context::QueryContext; diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 83ec1e52bc..41e95d3de1 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use catalog::local::MemoryCatalogManager; +use catalog::memory::MemoryCatalogManager; use catalog::RegisterTableRequest; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; @@ -44,7 +44,7 @@ use crate::tests::pow::pow; #[tokio::test] async fn test_datafusion_query_engine() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = catalog::local::new_memory_catalog_manager() + let catalog_list = catalog::memory::new_memory_catalog_manager() .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; let factory = QueryEngineFactory::new(catalog_list, None, false); @@ -104,7 +104,7 @@ async fn test_datafusion_query_engine() -> Result<()> { } fn catalog_manager() -> Result> { - let catalog_manager = catalog::local::new_memory_catalog_manager().unwrap(); + let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap(); let req = RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 383ac2ea80..17c894b545 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock}; -use catalog::local::new_memory_catalog_manager; +use catalog::memory::new_memory_catalog_manager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index c5fbe17072..f4827a18ad 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::local::MemoryCatalogManager; +use catalog::memory::MemoryCatalogManager; use common_catalog::consts::NUMBERS_TABLE_ID; use common_query::Output; use criterion::{black_box, criterion_group, criterion_main, Criterion}; diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 87cdf25eea..f5a2a20723 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -160,7 +160,7 @@ impl ScriptManager { #[cfg(test)] mod tests { - use catalog::local::MemoryCatalogManager; + use catalog::memory::MemoryCatalogManager; use query::QueryEngineFactory; use super::*; diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index f60b2914d7..a2879ab001 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -358,7 +358,7 @@ pub(crate) use tests::sample_script_engine; #[cfg(test)] mod tests { - use catalog::local::MemoryCatalogManager; + use catalog::memory::MemoryCatalogManager; use common_catalog::consts::NUMBERS_TABLE_ID; use common_recordbatch::util; use datatypes::prelude::ScalarVector; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index cb71ad4f29..1755756a32 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use async_trait::async_trait; -use catalog::local::MemoryCatalogManager; +use catalog::memory::MemoryCatalogManager; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; diff --git a/src/table/src/dist_table.rs b/src/table/src/dist_table.rs new file mode 100644 index 0000000000..dabcc6503c --- /dev/null +++ b/src/table/src/dist_table.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::SendableRecordBatchStream; +use store_api::data_source::DataSource; +use store_api::storage::ScanRequest; + +use crate::error::UnsupportedSnafu; +use crate::metadata::{FilterPushDownType, TableInfoRef}; +use crate::thin_table::{ThinTable, ThinTableAdapter}; +use crate::TableRef; + +#[derive(Clone)] +pub struct DistTable; + +impl DistTable { + pub fn table(table_info: TableInfoRef) -> TableRef { + let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact); + let data_source = Arc::new(DummyDataSource); + Arc::new(ThinTableAdapter::new(thin_table, data_source)) + } +} + +pub struct DummyDataSource; + +impl DataSource for DummyDataSource { + fn get_stream( + &self, + _request: ScanRequest, + ) -> std::result::Result { + UnsupportedSnafu { + operation: "get stream from a distributed table", + } + .fail() + .map_err(BoxedError::new) + } +} diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index a1e525e5a0..060d61b452 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +pub mod dist_table; pub mod engine; pub mod error; pub mod metadata; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 50269ada52..9c4c5380f6 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use catalog::remote::DummyKvCacheInvalidator; +use catalog::kvbackend::DummyKvCacheInvalidator; use common_base::Plugins; use common_config::KvStoreConfig; use common_procedure::options::ProcedureConfig; diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index dfb45d8b9b..e324a5f3eb 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; -use catalog::remote::CachedMetaKvBackend; +use catalog::kvbackend::CachedMetaKvBackend; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; use common_meta::key::table_name::TableNameKey;