From ada4666e1056c8f546d4013157fe7755145b2883 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 6 Jan 2026 21:21:36 +0800 Subject: [PATCH] refactor: remove `region_numbers` from `TableMeta` and `TableInfo` (#7519) * refactor: remove `region_numbers` from `TableMeta` and `TableInfo` Signed-off-by: WenyXu * feat: create partitions from region route Signed-off-by: WenyXu * fix: fix build Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/catalog/src/lib.rs | 1 + src/catalog/src/metrics.rs | 4 +- .../information_schema/tables.rs | 91 ++++++++++++------- src/catalog/src/table_source.rs | 2 +- src/cli/src/bench.rs | 1 - .../meta/src/ddl/create_logical_tables.rs | 11 +-- .../meta/src/ddl/test_util/create_table.rs | 1 - .../src/ddl/tests/create_logical_tables.rs | 13 +-- src/common/meta/src/key.rs | 38 +++----- src/common/meta/src/key/table_info.rs | 1 - src/common/meta/src/key/table_route.rs | 36 ++------ src/common/meta/src/key/test_utils.rs | 14 +-- src/common/meta/src/rpc/ddl.rs | 1 - src/flow/Cargo.toml | 2 +- src/flow/src/adapter/tests.rs | 3 +- .../src/handler/region_lease_handler.rs | 4 +- .../src/procedure/region_migration.rs | 8 +- .../downgrade_leader_region.rs | 2 +- .../src/procedure/region_migration/manager.rs | 18 ++-- .../region_migration/migration_start.rs | 8 +- .../region_migration/open_candidate_region.rs | 2 +- .../downgrade_leader_region.rs | 4 +- .../rollback_downgraded_region.rs | 2 +- .../upgrade_candidate_region.rs | 14 +-- .../upgrade_candidate_region.rs | 2 +- .../src/procedure/region_migration/utils.rs | 5 +- src/meta-srv/src/procedure/test_util.rs | 2 +- src/meta-srv/src/procedure/utils.rs | 1 - src/meta-srv/src/region/lease_keeper.rs | 14 +-- src/meta-srv/src/region/supervisor.rs | 2 +- src/operator/src/insert.rs | 1 - .../src/req_convert/insert/row_to_region.rs | 28 +----- src/operator/src/statement/ddl.rs | 1 - src/operator/src/tests/partition_manager.rs | 3 +- src/partition/src/manager.rs | 3 +- src/query/src/dist_plan/analyzer/test.rs | 1 - src/query/src/dist_plan/planner.rs | 25 +++-- src/query/src/error.rs | 8 ++ src/query/src/sql/show_create_table.rs | 2 - src/table/src/metadata.rs | 26 +----- src/table/src/table/numbers.rs | 1 - src/table/src/test_util/memtable.rs | 13 +-- src/table/src/test_util/table_info.rs | 1 - 43 files changed, 172 insertions(+), 248 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index b9c40d293d..9c31e809fd 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -32,6 +32,7 @@ use crate::error::Result; pub mod error; pub mod information_extension; pub mod kvbackend; +#[cfg(any(test, feature = "testing"))] pub mod memory; mod metrics; pub mod system_schema; diff --git a/src/catalog/src/metrics.rs b/src/catalog/src/metrics.rs index 635d2bdf0d..5bcb18e84b 100644 --- a/src/catalog/src/metrics.rs +++ b/src/catalog/src/metrics.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) const METRIC_DB_LABEL: &str = "db"; - use lazy_static::lazy_static; use prometheus::*; @@ -25,7 +23,7 @@ lazy_static! { pub static ref METRIC_CATALOG_MANAGER_TABLE_COUNT: IntGaugeVec = register_int_gauge_vec!( "greptime_catalog_table_count", "catalog table count", - &[METRIC_DB_LABEL] + &["db"] ) .unwrap(); pub static ref METRIC_CATALOG_KV_REMOTE_GET: Histogram = diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 38a0cb1d61..248fb243dd 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::pin::pin; use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -31,15 +32,17 @@ use datatypes::value::Value; use datatypes::vectors::{ StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, }; -use futures::TryStreamExt; +use futures::StreamExt; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, ScanRequest, TableId}; +use store_api::storage::{ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; use crate::CatalogManager; use crate::error::{ - CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, + CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result, + UpgradeWeakCatalogManagerRefSnafu, }; +use crate::kvbackend::KvBackendCatalogManager; use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES}; use crate::system_schema::utils; @@ -247,6 +250,10 @@ impl InformationSchemaTablesBuilder { .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let partition_manager = catalog_manager + .as_any() + .downcast_ref::() + .map(|catalog_manager| catalog_manager.partition_manager()); let predicates = Predicates::from_scan_request(&request); let information_extension = utils::information_extension(&self.catalog_manager)?; @@ -267,37 +274,59 @@ impl InformationSchemaTablesBuilder { }; for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); + let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None); - while let Some(table) = stream.try_next().await? { - let table_info = table.table_info(); + const BATCH_SIZE: usize = 128; + // Split tables into chunks + let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE)); - // TODO(dennis): make it working for metric engine - let table_region_stats = - if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() { - table_info - .meta - .region_numbers - .iter() - .map(|n| RegionId::new(table_info.ident.table_id, *n)) - .flat_map(|region_id| { - region_stats - .binary_search_by_key(®ion_id, |x| x.id) - .map(|i| ®ion_stats[i]) - }) - .collect::>() - } else { - vec![] - }; + while let Some(tables) = table_chunks.next().await { + let tables = tables.into_iter().collect::>>()?; + let mito_or_physical_table_ids = tables + .iter() + .filter(|table| { + table.table_info().meta.engine == MITO_ENGINE + || table.table_info().is_physical_table() + }) + .map(|table| table.table_info().ident.table_id) + .collect::>(); - self.add_table( - &predicates, - &catalog_name, - &schema_name, - table_info, - table.table_type(), - &table_region_stats, - ); + let table_routes = if let Some(partition_manager) = &partition_manager { + partition_manager + .batch_find_region_routes(&mito_or_physical_table_ids) + .await + .context(FindRegionRoutesSnafu)? + } else { + mito_or_physical_table_ids + .into_iter() + .map(|id| (id, vec![])) + .collect() + }; + + for table in tables { + let table_region_stats = + match table_routes.get(&table.table_info().ident.table_id) { + Some(routes) => routes + .iter() + .flat_map(|route| { + let region_id = route.region.id; + region_stats + .binary_search_by_key(®ion_id, |x| x.id) + .map(|i| ®ion_stats[i]) + }) + .collect::>(), + None => vec![], + }; + + self.add_table( + &predicates, + &catalog_name, + &schema_name, + table.table_info(), + table.table_type(), + &table_region_stats, + ); + } } } diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 9868ecd42a..fe69a1e72d 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -337,7 +337,7 @@ mod tests { .build(); let table_metadata_manager = TableMetadataManager::new(backend); - let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]); + let mut view_info = common_meta::key::test_utils::new_test_table_info(1024); view_info.table_type = TableType::View; let logical_plan = vec![1, 2, 3]; // Create view metadata diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 4a5c676b14..82c460c32b 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -162,7 +162,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { next_column_id: columns as u32 + 1, value_indices: vec![], options: Default::default(), - region_numbers: (1..=100).collect(), partition_key_indices: vec![], column_ids: vec![], }; diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 5ab7ced1bb..0fcb5c8d62 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionNumber; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; @@ -286,14 +286,7 @@ impl CreateTablesData { .flat_map(|(task, table_id)| { if table_id.is_none() { let table_info = task.table_info.clone(); - let region_ids = self - .physical_region_numbers - .iter() - .map(|region_number| { - RegionId::new(table_info.ident.table_id, *region_number) - }) - .collect(); - let table_route = TableRouteValue::logical(self.physical_table_id, region_ids); + let table_route = TableRouteValue::logical(self.physical_table_id); Some((table_info, table_route)) } else { None diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index a6cc1b4cbf..e407261e25 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -128,7 +128,6 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { value_indices: vec![], engine: expr.engine.clone(), next_column_id: expr.column_defs.len() as u32, - region_numbers: vec![], options: TableOptions::try_from_iter(&expr.table_options).unwrap(), created_on: DateTime::default(), updated_on: DateTime::default(), diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 74ca72250f..08e4cad69b 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -166,7 +166,7 @@ async fn test_on_prepare_logical_table_exists_err() { .table_metadata_manager .create_logical_tables_metadata(vec![( task.table_info.clone(), - TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]), + TableRouteValue::logical(1024), )]) .await .unwrap(); @@ -208,7 +208,7 @@ async fn test_on_prepare_with_create_if_table_exists() { .table_metadata_manager .create_logical_tables_metadata(vec![( task.table_info.clone(), - TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + TableRouteValue::logical(1024), )]) .await .unwrap(); @@ -252,7 +252,7 @@ async fn test_on_prepare_part_logical_tables_exist() { .table_metadata_manager .create_logical_tables_metadata(vec![( task.table_info.clone(), - TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + TableRouteValue::logical(1024), )]) .await .unwrap(); @@ -392,7 +392,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() { .table_metadata_manager .create_logical_tables_metadata(vec![( task.table_info.clone(), - TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + TableRouteValue::logical(1024), )]) .await .unwrap(); @@ -496,10 +496,7 @@ async fn test_on_create_metadata_err() { task.table_info.ident.table_id = 1025; ddl_context .table_metadata_manager - .create_logical_tables_metadata(vec![( - task.table_info, - TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]), - )]) + .create_logical_tables_metadata(vec![(task.table_info, TableRouteValue::logical(512))]) .await .unwrap(); // Triggers procedure to create table metadata diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 896bf75790..e223c91349 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -747,12 +747,10 @@ impl TableMetadataManager { /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn create_table_metadata( &self, - mut table_info: RawTableInfo, + table_info: RawTableInfo, table_route_value: TableRouteValue, region_wal_options: HashMap, ) -> Result<()> { - let region_numbers = table_route_value.region_numbers(); - table_info.meta.region_numbers = region_numbers; let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.clone(); @@ -851,8 +849,7 @@ impl TableMetadataManager { on_create_table_route_failure: F2, } let mut on_failures = Vec::with_capacity(len); - for (mut table_info, table_route_value) in tables_data { - table_info.meta.region_numbers = table_route_value.region_numbers(); + for (table_info, table_route_value) in tables_data { let table_id = table_info.ident.table_id; // Creates table name. @@ -1543,8 +1540,8 @@ mod tests { } } - fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { - test_utils::new_test_table_info(10, region_numbers) + fn new_test_table_info() -> TableInfo { + test_utils::new_test_table_info(10) } fn new_test_table_names() -> HashSet { @@ -1602,8 +1599,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let wal_allocator = WalOptionsAllocator::RaftEngine; let regions = (0..16).collect(); let region_wal_options = @@ -1630,8 +1626,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let region_wal_options = create_mock_region_wal_options() .into_iter() .map(|(k, v)| (k, serde_json::to_string(&v).unwrap())) @@ -1713,8 +1708,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; let table_route_value = TableRouteValue::physical(region_routes.clone()); @@ -1779,7 +1773,6 @@ mod tests { let table_info: RawTableInfo = test_utils::new_test_table_info_with_name( table_id, &format!("my_table_{}", table_id), - region_routes.iter().map(|r| r.region.id.region_number()), ) .into(); let table_route_value = TableRouteValue::physical(region_routes.clone()); @@ -1800,8 +1793,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; let datanode_id = 2; let region_wal_options = create_mock_region_wal_options(); @@ -1907,8 +1899,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; // creates metadata. create_physical_table_metadata( @@ -1984,8 +1975,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; // creates metadata. create_physical_table_metadata( @@ -2070,8 +2060,7 @@ mod tests { leader_down_since: None, }, ]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; let current_table_route_value = DeserializedValueWithBytes::from_inner( TableRouteValue::physical(region_routes.clone()), @@ -2153,8 +2142,7 @@ mod tests { let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_info: RawTableInfo = new_test_table_info().into(); let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.as_str(); let region_storage_path = @@ -2408,7 +2396,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let view_info: RawTableInfo = new_test_table_info(Vec::::new().into_iter()).into(); + let view_info: RawTableInfo = new_test_table_info().into(); let view_id = view_info.ident.table_id; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 637da1a32d..29a40d8222 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -338,7 +338,6 @@ mod tests { next_column_id: 3, value_indices: vec![2, 3], options: Default::default(), - region_numbers: vec![1], partition_key_indices: vec![], column_ids: vec![], }; diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index fe1f11bf15..fe07ce4084 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -71,7 +71,6 @@ pub struct PhysicalTableRouteValue { #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct LogicalTableRouteValue { physical_table_id: TableId, - region_ids: Vec, } impl TableRouteValue { @@ -85,14 +84,7 @@ impl TableRouteValue { if table_id == physical_table_id { TableRouteValue::physical(region_routes) } else { - let region_routes = region_routes - .into_iter() - .map(|region| { - debug_assert_eq!(region.region.id.table_id(), physical_table_id); - RegionId::new(table_id, region.region.id.region_number()) - }) - .collect(); - TableRouteValue::logical(physical_table_id, region_routes) + TableRouteValue::logical(physical_table_id) } } @@ -100,8 +92,8 @@ impl TableRouteValue { Self::Physical(PhysicalTableRouteValue::new(region_routes)) } - pub fn logical(physical_table_id: TableId, region_ids: Vec) -> Self { - Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids)) + pub fn logical(physical_table_id: TableId) -> Self { + Self::Logical(LogicalTableRouteValue::new(physical_table_id)) } /// Returns a new version [TableRouteValue] with `region_routes`. @@ -207,11 +199,9 @@ impl TableRouteValue { .iter() .map(|region_route| region_route.region.id.region_number()) .collect(), - TableRouteValue::Logical(x) => x - .region_ids() - .iter() - .map(|region_id| region_id.region_number()) - .collect(), + TableRouteValue::Logical(_) => { + vec![] + } } } } @@ -245,20 +235,13 @@ impl PhysicalTableRouteValue { } impl LogicalTableRouteValue { - pub fn new(physical_table_id: TableId, region_ids: Vec) -> Self { - Self { - physical_table_id, - region_ids, - } + pub fn new(physical_table_id: TableId) -> Self { + Self { physical_table_id } } pub fn physical_table_id(&self) -> TableId { self.physical_table_id } - - pub fn region_ids(&self) -> &Vec { - &self.region_ids - } } impl MetadataKey<'_, TableRouteKey> for TableRouteKey { @@ -900,7 +883,6 @@ mod tests { let table_route_manager = TableRouteManager::new(kv.clone()); let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue { physical_table_id: 1023, - region_ids: vec![RegionId::new(1023, 1)], }); let (txn, _) = table_route_manager .table_route_storage() @@ -930,14 +912,12 @@ mod tests { 1024, TableRouteValue::Logical(LogicalTableRouteValue { physical_table_id: 1023, - region_ids: vec![RegionId::new(1023, 1)], }), ), ( 1025, TableRouteValue::Logical(LogicalTableRouteValue { physical_table_id: 1023, - region_ids: vec![RegionId::new(1023, 2)], }), ), ]; diff --git a/src/common/meta/src/key/test_utils.rs b/src/common/meta/src/key/test_utils.rs index 463f55381f..1639172465 100644 --- a/src/common/meta/src/key/test_utils.rs +++ b/src/common/meta/src/key/test_utils.rs @@ -19,11 +19,7 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder}; use store_api::storage::TableId; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; -pub fn new_test_table_info_with_name>( - table_id: TableId, - table_name: &str, - region_numbers: I, -) -> TableInfo { +pub fn new_test_table_info_with_name(table_id: TableId, table_name: &str) -> TableInfo { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new( @@ -45,7 +41,6 @@ pub fn new_test_table_info_with_name>( .primary_key_indices(vec![0]) .engine("engine") .next_column_id(3) - .region_numbers(region_numbers.into_iter().collect::>()) .build() .unwrap(); TableInfoBuilder::default() @@ -56,9 +51,6 @@ pub fn new_test_table_info_with_name>( .build() .unwrap() } -pub fn new_test_table_info>( - table_id: TableId, - region_numbers: I, -) -> TableInfo { - new_test_table_info_with_name(table_id, "mytable", region_numbers) +pub fn new_test_table_info(table_id: TableId) -> TableInfo { + new_test_table_info_with_name(table_id, "mytable") } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index ba36909c7c..9d37bf1f3c 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1639,7 +1639,6 @@ mod tests { value_indices: vec![2], engine: METRIC_ENGINE_NAME.to_string(), next_column_id: 0, - region_numbers: vec![0], options: Default::default(), created_on: Default::default(), updated_on: Default::default(), diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 689a092b90..9223ffc026 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -79,7 +79,7 @@ tokio.workspace = true tonic.workspace = true [dev-dependencies] -catalog.workspace = true +catalog = { workspace = true, features = ["testing"] } common-catalog.workspace = true pretty_assertions.workspace = true prost.workspace = true diff --git a/src/flow/src/adapter/tests.rs b/src/flow/src/adapter/tests.rs index 7646b56e9b..9ed7e676c2 100644 --- a/src/flow/src/adapter/tests.rs +++ b/src/flow/src/adapter/tests.rs @@ -24,7 +24,7 @@ use super::*; pub fn new_test_table_info_with_name>( table_id: TableId, table_name: &str, - region_numbers: I, + _region_numbers: I, ) -> TableInfo { let column_schemas = vec![ ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), @@ -46,7 +46,6 @@ pub fn new_test_table_info_with_name>( .primary_key_indices(vec![0]) .engine("engine") .next_column_id(3) - .region_numbers(region_numbers.into_iter().collect::>()) .build() .unwrap(); TableInfoBuilder::default() diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index be46a9db1b..34011d9fee 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -192,7 +192,7 @@ mod test { let another_region_id = RegionId::new(table_id, region_number + 1); let peer = Peer::empty(datanode_id); let follower_peer = Peer::empty(datanode_id + 1); - let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let table_info = new_test_table_info(table_id).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), @@ -328,7 +328,7 @@ mod test { let no_exist_region_id = RegionId::new(table_id, region_number + 2); let peer = Peer::empty(datanode_id); let follower_peer = Peer::empty(datanode_id + 1); - let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let table_info = new_test_table_info(table_id).into(); let region_routes = vec![ RegionRoute { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 6d27cbf4e7..6f28c0a927 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -1172,7 +1172,7 @@ mod tests { let from_peer = persistent_context.from_peer.clone(); let to_peer = persistent_context.to_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1211,7 +1211,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1299,7 +1299,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), @@ -1419,7 +1419,7 @@ mod tests { let from_peer_id = persistent_context.from_peer.id; let from_peer = persistent_context.from_peer.clone(); let region_id = persistent_context.region_ids[0]; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 2a9cb2b187..2c60664663 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -401,7 +401,7 @@ mod tests { async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { let region_id = ctx.persistent_ctx.region_ids[0]; - let table_info = new_test_table_info(region_id.table_id(), vec![1]).into(); + let table_info = new_test_table_info(region_id.table_id()).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index fcd8f7a6e6..fbec7ca14e 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -698,7 +698,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(3)), @@ -726,7 +726,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -758,7 +758,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), @@ -792,7 +792,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(2)), @@ -822,7 +822,7 @@ mod test { let err = manager .verify_table_route( - &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])), + &TableRouteValue::Logical(LogicalTableRouteValue::new(0)), &task, ) .unwrap_err(); @@ -864,7 +864,7 @@ mod test { timeout: Duration::from_millis(1000), trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(2)), @@ -897,7 +897,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -930,7 +930,7 @@ mod test { trigger_reason: RegionMigrationTriggerReason::Manual, }; - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), @@ -974,7 +974,7 @@ mod test { task.trigger_reason, ), ); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(1)), diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 99d2972aa8..589f86b3a4 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -223,7 +223,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![3]).into(); + let table_info = new_test_table_info(1024).into(); let region_route = RegionRoute { region: Region::new_test(RegionId::new(1024, 3)), leader_peer: Some(from_peer.clone()), @@ -250,7 +250,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(to_peer), @@ -277,7 +277,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(from_peer_id)), @@ -302,7 +302,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes: Vec = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(1024)), diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 8a0b625b97..e3d8c027fc 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -425,7 +425,7 @@ mod tests { let mut env = TestingEnv::new(); // Prepares table - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(from_peer_id)), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 05e29c9b08..f2415d9d90 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -142,7 +142,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1024)), @@ -185,7 +185,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(from_peer.clone()), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index fc32e37672..eafcaf4677 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -120,7 +120,7 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let table_id = ctx.persistent_ctx.region_ids[0].table_id(); - let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![ RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 0e545f5d92..bdad23c0b1 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -262,7 +262,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![2]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), leader_peer: Some(Peer::empty(4)), @@ -295,7 +295,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(3)), @@ -330,7 +330,7 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(Peer::empty(1)), @@ -369,7 +369,7 @@ mod tests { let leader_peer = persistent_context.from_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(leader_peer), @@ -396,7 +396,7 @@ mod tests { let candidate_peer = persistent_context.to_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), @@ -424,7 +424,7 @@ mod tests { let candidate_peer = persistent_context.to_peer.clone(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), leader_peer: Some(candidate_peer), @@ -454,7 +454,7 @@ mod tests { let opening_keeper = MemoryRegionKeeper::default(); let table_id = 1024; - let table_info = new_test_table_info(table_id, vec![1]).into(); + let table_info = new_test_table_info(table_id).into(); let region_routes = vec![RegionRoute { region: Region::new_test(RegionId::new(table_id, 1)), leader_peer: Some(Peer::empty(1)), diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 0390ddf0da..9d0cb0d65a 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -381,7 +381,7 @@ mod tests { async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { let region_id = ctx.persistent_ctx.region_ids[0]; - let table_info = new_test_table_info(region_id.table_id(), vec![1]).into(); + let table_info = new_test_table_info(region_id.table_id()).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), diff --git a/src/meta-srv/src/procedure/region_migration/utils.rs b/src/meta-srv/src/procedure/region_migration/utils.rs index 09921ee0d6..e6243ec5b0 100644 --- a/src/meta-srv/src/procedure/region_migration/utils.rs +++ b/src/meta-srv/src/procedure/region_migration/utils.rs @@ -390,10 +390,7 @@ mod tests { .table_route_storage() .build_create_txn( 1024, - &TableRouteValue::Logical(LogicalTableRouteValue::new( - 1024, - vec![RegionId::new(1023, 1)], - )), + &TableRouteValue::Logical(LogicalTableRouteValue::new(1024)), ) .unwrap(); kv_backend.txn(txn).await.unwrap(); diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 04cff16ea4..37a339e061 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -271,7 +271,7 @@ pub async fn new_wal_prune_metadata( let region_ids = (0..n_region) .map(|i| RegionId::new(table_id, i)) .collect::>(); - let table_info = new_test_table_info(table_id, 0..n_region).into(); + let table_info = new_test_table_info(table_id).into(); let region_routes = region_ids .iter() .map(|region_id| RegionRoute { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 8f0aa9b7cb..ae4c7c3e43 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -192,7 +192,6 @@ pub mod test_data { value_indices: vec![2], engine: MITO2_ENGINE.to_string(), next_column_id: 3, - region_numbers: vec![1, 2, 3], options: TableOptions::default(), created_on: DateTime::default(), updated_on: DateTime::default(), diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 324396fa64..c4b105345c 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -342,9 +342,8 @@ mod tests { #[tokio::test] async fn test_collect_metadata() { - let region_number = 1u32; let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let table_info: RawTableInfo = new_test_table_info(table_id).into(); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; @@ -394,9 +393,8 @@ mod tests { #[tokio::test] async fn test_renew_region_leases_basic() { - let region_number = 1u32; let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let table_info: RawTableInfo = new_test_table_info(table_id).into(); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; @@ -502,9 +500,8 @@ mod tests { #[tokio::test] async fn test_renew_unexpected_logic_table() { - let region_number = 1u32; let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let table_info: RawTableInfo = new_test_table_info(table_id).into(); let region_id = RegionId::new(table_id, 1); let keeper = new_test_keeper(); @@ -512,7 +509,7 @@ mod tests { table_metadata_manager .create_table_metadata( table_info, - TableRouteValue::Logical(LogicalTableRouteValue::new(table_id, vec![region_id])), + TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)), HashMap::default(), ) .await @@ -539,9 +536,8 @@ mod tests { #[tokio::test] async fn test_renew_region_leases_with_downgrade_leader() { - let region_number = 1u32; let table_id = 1024; - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let table_info: RawTableInfo = new_test_table_info(table_id).into(); let region_id = RegionId::new(table_id, 1); let leader_peer_id = 1024; diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 866431dec1..f35d0b898f 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -1070,7 +1070,7 @@ pub(crate) mod tests { let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table"); test_create_logical_table_task.set_table_id(logical_table_id); let table_info = test_create_logical_table_task.table_info; - let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]); + let table_route = LogicalTableRouteValue::new(1024); let table_route_value = TableRouteValue::Logical(table_route); table_metadata_manager .create_table_metadata(table_info, table_route_value, HashMap::new()) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 201d5d99f4..0276e549d6 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -1220,7 +1220,6 @@ mod tests { .next_column_id(0) .options(Default::default()) .created_on(Default::default()) - .region_numbers(vec![0]) .build() .unwrap(); let info = Arc::new( diff --git a/src/operator/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs index f08a7cac85..9a03cd7121 100644 --- a/src/operator/src/req_convert/insert/row_to_region.rs +++ b/src/operator/src/req_convert/insert/row_to_region.rs @@ -14,10 +14,9 @@ use ahash::{HashMap, HashSet}; use api::v1::RowInsertRequests; -use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests}; +use api::v1::region::InsertRequests as RegionInsertRequests; use partition::manager::PartitionRuleManager; use snafu::OptionExt; -use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{TableId, TableInfoRef}; use crate::error::{Result, TableNotFoundSnafu}; @@ -54,20 +53,10 @@ impl<'a> RowToRegion<'a> { let table_info = self.get_table_info(&request.table_name)?; let table_id = table_info.table_id(); - let region_numbers = self.region_numbers(&request.table_name)?; - let requests = if let Some(region_id) = match region_numbers[..] { - [singular] => Some(RegionId::new(table_id, singular)), - _ => None, - } { - vec![InsertRequest { - region_id: region_id.as_u64(), - rows: Some(rows), - }] - } else { - Partitioner::new(self.partition_manager) - .partition_insert_requests(table_info, rows) - .await? - }; + + let requests = Partitioner::new(self.partition_manager) + .partition_insert_requests(table_info, rows) + .await?; if self.instant_table_ids.contains(&table_id) { instant_request.extend(requests); @@ -91,11 +80,4 @@ impl<'a> RowToRegion<'a> { .get(table_name) .context(TableNotFoundSnafu { table_name }) } - - fn region_numbers(&self, table_name: &str) -> Result<&Vec> { - self.tables_info - .get(table_name) - .map(|x| &x.meta.region_numbers) - .context(TableNotFoundSnafu { table_name }) - } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 536147bcac..ed7b56f976 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -1837,7 +1837,6 @@ pub fn create_table_info( value_indices: vec![], engine: create_table.engine.clone(), next_column_id: column_schemas.len() as u32, - region_numbers: vec![], options: table_options, created_on: Utc::now(), updated_on: Utc::now(), diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 2ecf338bb7..1c8c59ed91 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -32,7 +32,7 @@ use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; pub fn new_test_table_info( table_id: u32, table_name: &str, - region_numbers: impl Iterator, + _region_numbers: impl Iterator, ) -> TableInfo { let column_schemas = vec![ ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), @@ -55,7 +55,6 @@ pub fn new_test_table_info( .primary_key_indices(vec![0]) .engine("engine") .next_column_id(3) - .region_numbers(region_numbers.collect::>()) .partition_key_indices(vec![0]) .build() .unwrap(); diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index b8cf6cdb6b..4e068af20b 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -199,7 +199,8 @@ impl PartitionRuleManager { } } -fn create_partitions_from_region_routes( +/// Creates partitions from region routes. +pub fn create_partitions_from_region_routes( table_id: TableId, region_routes: &[RegionRoute], ) -> Result> { diff --git a/src/query/src/dist_plan/analyzer/test.rs b/src/query/src/dist_plan/analyzer/test.rs index bedcafc703..f2b39f5a9c 100644 --- a/src/query/src/dist_plan/analyzer/test.rs +++ b/src/query/src/dist_plan/analyzer/test.rs @@ -88,7 +88,6 @@ impl TestTable { primary_key_indices: vec![0, 1, 2], value_indices: vec![4], engine, - region_numbers: vec![0, 1], next_column_id: 5, options: Default::default(), created_on: Default::default(), diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index cea8e54045..5864ea7bd0 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -28,7 +28,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; -use partition::manager::PartitionRuleManagerRef; +use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes}; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -40,7 +40,7 @@ use crate::dist_plan::PredicateExtractor; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::dist_plan::merge_sort::MergeSortLogicalPlan; use crate::dist_plan::region_pruner::ConstraintPruner; -use crate::error::{CatalogSnafu, TableNotFoundSnafu}; +use crate::error::{CatalogSnafu, PartitionRuleManagerSnafu, TableNotFoundSnafu}; use crate::region_query::RegionQueryHandlerRef; /// Planner for convert merge sort logical plan to physical plan @@ -211,8 +211,16 @@ impl DistExtensionPlanner { })?; let table_info = table.table_info(); - let all_regions = table_info.region_ids(); - + let physical_table_route = self + .partition_rule_manager + .find_physical_table_route(table_info.table_id()) + .await + .context(PartitionRuleManagerSnafu)?; + let all_regions = physical_table_route + .region_routes + .iter() + .map(|r| RegionId::new(table_info.table_id(), r.region.id.region_number())) + .collect::>(); // Extract partition columns let partition_columns: Vec = table_info.meta.partition_column_names().cloned().collect(); @@ -256,11 +264,10 @@ impl DistExtensionPlanner { } // Get partition information for the table if partition rule manager is available - let partitions = match self - .partition_rule_manager - .find_table_partitions(table.table_info().table_id()) - .await - { + let partitions = match create_partitions_from_region_routes( + table_info.table_id(), + &physical_table_route.region_routes, + ) { Ok(partitions) => partitions, Err(err) => { common_telemetry::debug!( diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 4649b7fe49..3f8f9332c8 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -57,6 +57,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Partition rule manager error"))] + PartitionRuleManager { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Table not found: {}", table))] TableNotFound { table: String, @@ -396,6 +403,7 @@ impl ErrorExt for Error { QueryAccessDenied { .. } => StatusCode::AccessDenied, Catalog { source, .. } => source.status_code(), CreateRecordBatch { source, .. } => source.status_code(), + PartitionRuleManager { source, .. } => source.status_code(), QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(), PlanSql { error, .. } => { datafusion_status_code::(error, Some(StatusCode::PlanQuery)) diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index d07c9dcb4e..95e8ae175b 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -322,7 +322,6 @@ mod tests { let table_name = "system_metrics"; let schema_name = "public".to_string(); let catalog_name = "greptime".to_string(); - let regions = vec![0, 1, 2]; let mut options = table::requests::TableOptions { ttl: Some(Duration::from_secs(30).into()), @@ -341,7 +340,6 @@ mod tests { .next_column_id(0) .options(options) .created_on(Default::default()) - .region_numbers(regions) .build() .unwrap(); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index f86a1edbb5..145f9308df 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -31,7 +31,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY}; use store_api::region_request::{SetRegionOption, UnsetRegionOption}; -use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; +use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId}; use crate::error::{self, Result}; use crate::requests::{ @@ -135,8 +135,6 @@ pub struct TableMeta { pub value_indices: Vec, #[builder(default, setter(into))] pub engine: String, - #[builder(default, setter(into))] - pub region_numbers: Vec, pub next_column_id: ColumnId, /// Table options. #[builder(default)] @@ -160,7 +158,6 @@ impl TableMetaBuilder { primary_key_indices: None, value_indices: None, engine: None, - region_numbers: None, next_column_id: None, options: None, created_on: None, @@ -194,7 +191,6 @@ impl TableMetaBuilder { primary_key_indices: Some(Vec::new()), value_indices: Some(Vec::new()), engine: None, - region_numbers: Some(Vec::new()), next_column_id: Some(0), options: None, created_on: None, @@ -1088,13 +1084,6 @@ impl TableInfo { self.ident.table_id } - pub fn region_ids(&self) -> Vec { - self.meta - .region_numbers - .iter() - .map(|id| RegionId::new(self.table_id(), *id)) - .collect() - } /// Returns the full table name in the form of `{catalog}.{schema}.{table}`. pub fn full_table_name(&self) -> String { common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name) @@ -1174,7 +1163,6 @@ pub struct RawTableMeta { /// Next column id of a new column. /// It's used to ensure all columns with the same name across all regions have the same column id. pub next_column_id: ColumnId, - pub region_numbers: Vec, pub options: TableOptions, pub created_on: DateTime, pub updated_on: DateTime, @@ -1201,7 +1189,6 @@ impl<'de> Deserialize<'de> for RawTableMeta { value_indices: Vec, engine: String, next_column_id: u32, - region_numbers: Vec, options: TableOptions, created_on: DateTime, updated_on: Option>, @@ -1218,7 +1205,6 @@ impl<'de> Deserialize<'de> for RawTableMeta { value_indices: h.value_indices, engine: h.engine, next_column_id: h.next_column_id, - region_numbers: h.region_numbers, options: h.options, created_on: h.created_on, updated_on: h.updated_on.unwrap_or(h.created_on), @@ -1236,7 +1222,6 @@ impl From for RawTableMeta { value_indices: meta.value_indices, engine: meta.engine, next_column_id: meta.next_column_id, - region_numbers: meta.region_numbers, options: meta.options, created_on: meta.created_on, updated_on: meta.updated_on, @@ -1255,7 +1240,6 @@ impl TryFrom for TableMeta { primary_key_indices: raw.primary_key_indices, value_indices: raw.value_indices, engine: raw.engine, - region_numbers: raw.region_numbers, next_column_id: raw.next_column_id, options: raw.options, created_on: raw.created_on, @@ -1608,8 +1592,6 @@ mod tests { .unwrap(); let new_meta = add_columns_to_meta(&meta); - assert_eq!(meta.region_numbers, new_meta.region_numbers); - let names: Vec = new_meta .schema .column_schemas() @@ -1685,8 +1667,6 @@ mod tests { .build() .unwrap(); - assert_eq!(meta.region_numbers, new_meta.region_numbers); - let names: Vec = new_meta .schema .column_schemas() @@ -2038,8 +2018,6 @@ mod tests { .unwrap(); let new_meta = add_columns_to_meta_with_location(&meta); - assert_eq!(meta.region_numbers, new_meta.region_numbers); - let names: Vec = new_meta .schema .column_schemas() @@ -2090,8 +2068,6 @@ mod tests { // Add a string column and make it fulltext indexed let new_meta = add_columns_to_meta_with_location(&meta); - assert_eq!(meta.region_numbers, new_meta.region_numbers); - let alter_kind = AlterKind::SetIndexes { options: vec![SetIndexOption::Fulltext { column_name: "my_tag_first".to_string(), diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index eb67dcfaef..6dd40fe747 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -76,7 +76,6 @@ impl NumbersTable { primary_key_indices: vec![0], value_indices: vec![], engine, - region_numbers: vec![0], next_column_id: 1, options: Default::default(), created_on: Default::default(), diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 812b5edaf2..f1108bc533 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -27,7 +27,7 @@ use futures::Stream; use futures::task::{Context, Poll}; use snafu::prelude::*; use store_api::data_source::DataSource; -use store_api::storage::{RegionNumber, ScanRequest}; +use store_api::storage::ScanRequest; use crate::error::{SchemaConversionSnafu, TableProjectionSnafu}; use crate::metadata::{ @@ -39,21 +39,16 @@ pub struct MemTable; impl MemTable { pub fn table(table_name: impl Into, recordbatch: RecordBatch) -> TableRef { - Self::new_with_region(table_name, recordbatch, vec![0]) + Self::new_with_region(table_name, recordbatch) } - pub fn new_with_region( - table_name: impl Into, - recordbatch: RecordBatch, - regions: Vec, - ) -> TableRef { + pub fn new_with_region(table_name: impl Into, recordbatch: RecordBatch) -> TableRef { Self::new_with_catalog( table_name, recordbatch, 1, DEFAULT_CATALOG_NAME.to_string(), DEFAULT_SCHEMA_NAME.to_string(), - regions, ) } @@ -63,7 +58,6 @@ impl MemTable { table_id: TableId, catalog_name: String, schema_name: String, - regions: Vec, ) -> TableRef { let schema = recordbatch.schema.clone(); @@ -75,7 +69,6 @@ impl MemTable { .next_column_id(0) .options(Default::default()) .created_on(Default::default()) - .region_numbers(regions) .build() .unwrap(); diff --git a/src/table/src/test_util/table_info.rs b/src/table/src/test_util/table_info.rs index cd1acda673..f62c299625 100644 --- a/src/table/src/test_util/table_info.rs +++ b/src/table/src/test_util/table_info.rs @@ -31,7 +31,6 @@ pub fn test_table_info( .next_column_id(0) .options(Default::default()) .created_on(Default::default()) - .region_numbers(vec![1]) .build() .unwrap();