mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 14:30:43 +00:00
refactor: remove region_numbers from TableMeta and TableInfo (#7519)
* refactor: remove `region_numbers` from `TableMeta` and `TableInfo` Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: create partitions from region route Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix build Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -32,6 +32,7 @@ use crate::error::Result;
|
||||
pub mod error;
|
||||
pub mod information_extension;
|
||||
pub mod kvbackend;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod memory;
|
||||
mod metrics;
|
||||
pub mod system_schema;
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) const METRIC_DB_LABEL: &str = "db";
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::*;
|
||||
|
||||
@@ -25,7 +23,7 @@ lazy_static! {
|
||||
pub static ref METRIC_CATALOG_MANAGER_TABLE_COUNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"greptime_catalog_table_count",
|
||||
"catalog table count",
|
||||
&[METRIC_DB_LABEL]
|
||||
&["db"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_CATALOG_KV_REMOTE_GET: Histogram =
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use core::pin::pin;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
@@ -31,15 +32,17 @@ use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use futures::StreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, ScanRequest, TableId};
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
use table::metadata::{TableInfo, TableType};
|
||||
|
||||
use crate::CatalogManager;
|
||||
use crate::error::{
|
||||
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
|
||||
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
|
||||
UpgradeWeakCatalogManagerRefSnafu,
|
||||
};
|
||||
use crate::kvbackend::KvBackendCatalogManager;
|
||||
use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
|
||||
use crate::system_schema::utils;
|
||||
|
||||
@@ -247,6 +250,10 @@ impl InformationSchemaTablesBuilder {
|
||||
.catalog_manager
|
||||
.upgrade()
|
||||
.context(UpgradeWeakCatalogManagerRefSnafu)?;
|
||||
let partition_manager = catalog_manager
|
||||
.as_any()
|
||||
.downcast_ref::<KvBackendCatalogManager>()
|
||||
.map(|catalog_manager| catalog_manager.partition_manager());
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
|
||||
let information_extension = utils::information_extension(&self.catalog_manager)?;
|
||||
@@ -267,37 +274,59 @@ impl InformationSchemaTablesBuilder {
|
||||
};
|
||||
|
||||
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
|
||||
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
|
||||
let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None);
|
||||
|
||||
while let Some(table) = stream.try_next().await? {
|
||||
let table_info = table.table_info();
|
||||
const BATCH_SIZE: usize = 128;
|
||||
// Split tables into chunks
|
||||
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
|
||||
|
||||
// TODO(dennis): make it working for metric engine
|
||||
let table_region_stats =
|
||||
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
|
||||
table_info
|
||||
.meta
|
||||
.region_numbers
|
||||
.iter()
|
||||
.map(|n| RegionId::new(table_info.ident.table_id, *n))
|
||||
.flat_map(|region_id| {
|
||||
region_stats
|
||||
.binary_search_by_key(®ion_id, |x| x.id)
|
||||
.map(|i| ®ion_stats[i])
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
while let Some(tables) = table_chunks.next().await {
|
||||
let tables = tables.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
let mito_or_physical_table_ids = tables
|
||||
.iter()
|
||||
.filter(|table| {
|
||||
table.table_info().meta.engine == MITO_ENGINE
|
||||
|| table.table_info().is_physical_table()
|
||||
})
|
||||
.map(|table| table.table_info().ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.add_table(
|
||||
&predicates,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
table_info,
|
||||
table.table_type(),
|
||||
&table_region_stats,
|
||||
);
|
||||
let table_routes = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
.batch_find_region_routes(&mito_or_physical_table_ids)
|
||||
.await
|
||||
.context(FindRegionRoutesSnafu)?
|
||||
} else {
|
||||
mito_or_physical_table_ids
|
||||
.into_iter()
|
||||
.map(|id| (id, vec![]))
|
||||
.collect()
|
||||
};
|
||||
|
||||
for table in tables {
|
||||
let table_region_stats =
|
||||
match table_routes.get(&table.table_info().ident.table_id) {
|
||||
Some(routes) => routes
|
||||
.iter()
|
||||
.flat_map(|route| {
|
||||
let region_id = route.region.id;
|
||||
region_stats
|
||||
.binary_search_by_key(®ion_id, |x| x.id)
|
||||
.map(|i| ®ion_stats[i])
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
self.add_table(
|
||||
&predicates,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
table.table_info(),
|
||||
table.table_type(),
|
||||
&table_region_stats,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -337,7 +337,7 @@ mod tests {
|
||||
.build();
|
||||
|
||||
let table_metadata_manager = TableMetadataManager::new(backend);
|
||||
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
|
||||
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024);
|
||||
view_info.table_type = TableType::View;
|
||||
let logical_plan = vec![1, 2, 3];
|
||||
// Create view metadata
|
||||
|
||||
@@ -162,7 +162,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
|
||||
next_column_id: columns as u32 + 1,
|
||||
value_indices: vec![],
|
||||
options: Default::default(),
|
||||
region_numbers: (1..=100).collect(),
|
||||
partition_key_indices: vec![],
|
||||
column_ids: vec![],
|
||||
};
|
||||
|
||||
@@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use store_api::storage::RegionNumber;
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
@@ -286,14 +286,7 @@ impl CreateTablesData {
|
||||
.flat_map(|(task, table_id)| {
|
||||
if table_id.is_none() {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| {
|
||||
RegionId::new(table_info.ident.table_id, *region_number)
|
||||
})
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id);
|
||||
Some((table_info, table_route))
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -128,7 +128,6 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
||||
value_indices: vec![],
|
||||
engine: expr.engine.clone(),
|
||||
next_column_id: expr.column_defs.len() as u32,
|
||||
region_numbers: vec![],
|
||||
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
|
||||
created_on: DateTime::default(),
|
||||
updated_on: DateTime::default(),
|
||||
|
||||
@@ -166,7 +166,7 @@ async fn test_on_prepare_logical_table_exists_err() {
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
|
||||
TableRouteValue::logical(1024),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -208,7 +208,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
TableRouteValue::logical(1024),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -252,7 +252,7 @@ async fn test_on_prepare_part_logical_tables_exist() {
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
TableRouteValue::logical(1024),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -392,7 +392,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
TableRouteValue::logical(1024),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -496,10 +496,7 @@ async fn test_on_create_metadata_err() {
|
||||
task.table_info.ident.table_id = 1025;
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info,
|
||||
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
|
||||
)])
|
||||
.create_logical_tables_metadata(vec![(task.table_info, TableRouteValue::logical(512))])
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
|
||||
@@ -747,12 +747,10 @@ impl TableMetadataManager {
|
||||
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
|
||||
pub async fn create_table_metadata(
|
||||
&self,
|
||||
mut table_info: RawTableInfo,
|
||||
table_info: RawTableInfo,
|
||||
table_route_value: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Result<()> {
|
||||
let region_numbers = table_route_value.region_numbers();
|
||||
table_info.meta.region_numbers = region_numbers;
|
||||
let table_id = table_info.ident.table_id;
|
||||
let engine = table_info.meta.engine.clone();
|
||||
|
||||
@@ -851,8 +849,7 @@ impl TableMetadataManager {
|
||||
on_create_table_route_failure: F2,
|
||||
}
|
||||
let mut on_failures = Vec::with_capacity(len);
|
||||
for (mut table_info, table_route_value) in tables_data {
|
||||
table_info.meta.region_numbers = table_route_value.region_numbers();
|
||||
for (table_info, table_route_value) in tables_data {
|
||||
let table_id = table_info.ident.table_id;
|
||||
|
||||
// Creates table name.
|
||||
@@ -1543,8 +1540,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
|
||||
test_utils::new_test_table_info(10, region_numbers)
|
||||
fn new_test_table_info() -> TableInfo {
|
||||
test_utils::new_test_table_info(10)
|
||||
}
|
||||
|
||||
fn new_test_table_names() -> HashSet<TableName> {
|
||||
@@ -1602,8 +1599,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = &vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let wal_allocator = WalOptionsAllocator::RaftEngine;
|
||||
let regions = (0..16).collect();
|
||||
let region_wal_options =
|
||||
@@ -1630,8 +1626,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = &vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let region_wal_options = create_mock_region_wal_options()
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
|
||||
@@ -1713,8 +1708,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
let table_route_value = TableRouteValue::physical(region_routes.clone());
|
||||
|
||||
@@ -1779,7 +1773,6 @@ mod tests {
|
||||
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
|
||||
table_id,
|
||||
&format!("my_table_{}", table_id),
|
||||
region_routes.iter().map(|r| r.region.id.region_number()),
|
||||
)
|
||||
.into();
|
||||
let table_route_value = TableRouteValue::physical(region_routes.clone());
|
||||
@@ -1800,8 +1793,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = &vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
let datanode_id = 2;
|
||||
let region_wal_options = create_mock_region_wal_options();
|
||||
@@ -1907,8 +1899,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
// creates metadata.
|
||||
create_physical_table_metadata(
|
||||
@@ -1984,8 +1975,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
// creates metadata.
|
||||
create_physical_table_metadata(
|
||||
@@ -2070,8 +2060,7 @@ mod tests {
|
||||
leader_down_since: None,
|
||||
},
|
||||
];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
let current_table_route_value = DeserializedValueWithBytes::from_inner(
|
||||
TableRouteValue::physical(region_routes.clone()),
|
||||
@@ -2153,8 +2142,7 @@ mod tests {
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
let region_route = new_test_region_route();
|
||||
let region_routes = vec![region_route.clone()];
|
||||
let table_info: RawTableInfo =
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let table_info: RawTableInfo = new_test_table_info().into();
|
||||
let table_id = table_info.ident.table_id;
|
||||
let engine = table_info.meta.engine.as_str();
|
||||
let region_storage_path =
|
||||
@@ -2408,7 +2396,7 @@ mod tests {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
let table_metadata_manager = TableMetadataManager::new(mem_kv);
|
||||
|
||||
let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
|
||||
let view_info: RawTableInfo = new_test_table_info().into();
|
||||
|
||||
let view_id = view_info.ident.table_id;
|
||||
|
||||
|
||||
@@ -338,7 +338,6 @@ mod tests {
|
||||
next_column_id: 3,
|
||||
value_indices: vec![2, 3],
|
||||
options: Default::default(),
|
||||
region_numbers: vec![1],
|
||||
partition_key_indices: vec![],
|
||||
column_ids: vec![],
|
||||
};
|
||||
|
||||
@@ -71,7 +71,6 @@ pub struct PhysicalTableRouteValue {
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
||||
pub struct LogicalTableRouteValue {
|
||||
physical_table_id: TableId,
|
||||
region_ids: Vec<RegionId>,
|
||||
}
|
||||
|
||||
impl TableRouteValue {
|
||||
@@ -85,14 +84,7 @@ impl TableRouteValue {
|
||||
if table_id == physical_table_id {
|
||||
TableRouteValue::physical(region_routes)
|
||||
} else {
|
||||
let region_routes = region_routes
|
||||
.into_iter()
|
||||
.map(|region| {
|
||||
debug_assert_eq!(region.region.id.table_id(), physical_table_id);
|
||||
RegionId::new(table_id, region.region.id.region_number())
|
||||
})
|
||||
.collect();
|
||||
TableRouteValue::logical(physical_table_id, region_routes)
|
||||
TableRouteValue::logical(physical_table_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,8 +92,8 @@ impl TableRouteValue {
|
||||
Self::Physical(PhysicalTableRouteValue::new(region_routes))
|
||||
}
|
||||
|
||||
pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
|
||||
Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
|
||||
pub fn logical(physical_table_id: TableId) -> Self {
|
||||
Self::Logical(LogicalTableRouteValue::new(physical_table_id))
|
||||
}
|
||||
|
||||
/// Returns a new version [TableRouteValue] with `region_routes`.
|
||||
@@ -207,11 +199,9 @@ impl TableRouteValue {
|
||||
.iter()
|
||||
.map(|region_route| region_route.region.id.region_number())
|
||||
.collect(),
|
||||
TableRouteValue::Logical(x) => x
|
||||
.region_ids()
|
||||
.iter()
|
||||
.map(|region_id| region_id.region_number())
|
||||
.collect(),
|
||||
TableRouteValue::Logical(_) => {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -245,20 +235,13 @@ impl PhysicalTableRouteValue {
|
||||
}
|
||||
|
||||
impl LogicalTableRouteValue {
|
||||
pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
|
||||
Self {
|
||||
physical_table_id,
|
||||
region_ids,
|
||||
}
|
||||
pub fn new(physical_table_id: TableId) -> Self {
|
||||
Self { physical_table_id }
|
||||
}
|
||||
|
||||
pub fn physical_table_id(&self) -> TableId {
|
||||
self.physical_table_id
|
||||
}
|
||||
|
||||
pub fn region_ids(&self) -> &Vec<RegionId> {
|
||||
&self.region_ids
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
|
||||
@@ -900,7 +883,6 @@ mod tests {
|
||||
let table_route_manager = TableRouteManager::new(kv.clone());
|
||||
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 1)],
|
||||
});
|
||||
let (txn, _) = table_route_manager
|
||||
.table_route_storage()
|
||||
@@ -930,14 +912,12 @@ mod tests {
|
||||
1024,
|
||||
TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 1)],
|
||||
}),
|
||||
),
|
||||
(
|
||||
1025,
|
||||
TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 2)],
|
||||
}),
|
||||
),
|
||||
];
|
||||
|
||||
@@ -19,11 +19,7 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use store_api::storage::TableId;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
|
||||
|
||||
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
table_name: &str,
|
||||
region_numbers: I,
|
||||
) -> TableInfo {
|
||||
pub fn new_test_table_info_with_name(table_id: TableId, table_name: &str) -> TableInfo {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
@@ -45,7 +41,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
|
||||
.build()
|
||||
.unwrap();
|
||||
TableInfoBuilder::default()
|
||||
@@ -56,9 +51,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
region_numbers: I,
|
||||
) -> TableInfo {
|
||||
new_test_table_info_with_name(table_id, "mytable", region_numbers)
|
||||
pub fn new_test_table_info(table_id: TableId) -> TableInfo {
|
||||
new_test_table_info_with_name(table_id, "mytable")
|
||||
}
|
||||
|
||||
@@ -1639,7 +1639,6 @@ mod tests {
|
||||
value_indices: vec![2],
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
next_column_id: 0,
|
||||
region_numbers: vec![0],
|
||||
options: Default::default(),
|
||||
created_on: Default::default(),
|
||||
updated_on: Default::default(),
|
||||
|
||||
@@ -79,7 +79,7 @@ tokio.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
catalog.workspace = true
|
||||
catalog = { workspace = true, features = ["testing"] }
|
||||
common-catalog.workspace = true
|
||||
pretty_assertions.workspace = true
|
||||
prost.workspace = true
|
||||
|
||||
@@ -24,7 +24,7 @@ use super::*;
|
||||
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
table_name: &str,
|
||||
region_numbers: I,
|
||||
_region_numbers: I,
|
||||
) -> TableInfo {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||
@@ -46,7 +46,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
|
||||
.build()
|
||||
.unwrap();
|
||||
TableInfoBuilder::default()
|
||||
|
||||
@@ -192,7 +192,7 @@ mod test {
|
||||
let another_region_id = RegionId::new(table_id, region_number + 1);
|
||||
let peer = Peer::empty(datanode_id);
|
||||
let follower_peer = Peer::empty(datanode_id + 1);
|
||||
let table_info = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info = new_test_table_info(table_id).into();
|
||||
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
@@ -328,7 +328,7 @@ mod test {
|
||||
let no_exist_region_id = RegionId::new(table_id, region_number + 2);
|
||||
let peer = Peer::empty(datanode_id);
|
||||
let follower_peer = Peer::empty(datanode_id + 1);
|
||||
let table_info = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info = new_test_table_info(table_id).into();
|
||||
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
|
||||
@@ -1172,7 +1172,7 @@ mod tests {
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
let to_peer = persistent_context.to_peer.clone();
|
||||
let region_id = persistent_context.region_ids[0];
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(from_peer),
|
||||
@@ -1211,7 +1211,7 @@ mod tests {
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
let region_id = persistent_context.region_ids[0];
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(from_peer),
|
||||
@@ -1299,7 +1299,7 @@ mod tests {
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
let region_id = persistent_context.region_ids[0];
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(from_peer),
|
||||
@@ -1419,7 +1419,7 @@ mod tests {
|
||||
let from_peer_id = persistent_context.from_peer.id;
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
let region_id = persistent_context.region_ids[0];
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(from_peer),
|
||||
|
||||
@@ -401,7 +401,7 @@ mod tests {
|
||||
|
||||
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
|
||||
let region_id = ctx.persistent_ctx.region_ids[0];
|
||||
let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
|
||||
let table_info = new_test_table_info(region_id.table_id()).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
|
||||
|
||||
@@ -698,7 +698,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -726,7 +726,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -758,7 +758,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -792,7 +792,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(2)),
|
||||
@@ -822,7 +822,7 @@ mod test {
|
||||
|
||||
let err = manager
|
||||
.verify_table_route(
|
||||
&TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
|
||||
&TableRouteValue::Logical(LogicalTableRouteValue::new(0)),
|
||||
&task,
|
||||
)
|
||||
.unwrap_err();
|
||||
@@ -864,7 +864,7 @@ mod test {
|
||||
timeout: Duration::from_millis(1000),
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(2)),
|
||||
@@ -897,7 +897,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -930,7 +930,7 @@ mod test {
|
||||
trigger_reason: RegionMigrationTriggerReason::Manual,
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -974,7 +974,7 @@ mod test {
|
||||
task.trigger_reason,
|
||||
),
|
||||
);
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
|
||||
@@ -223,7 +223,7 @@ mod tests {
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![3]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_route = RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 3)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
@@ -250,7 +250,7 @@ mod tests {
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(to_peer),
|
||||
@@ -277,7 +277,7 @@ mod tests {
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(from_peer_id)),
|
||||
@@ -302,7 +302,7 @@ mod tests {
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes: Vec<RegionRoute> = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(1024)),
|
||||
|
||||
@@ -425,7 +425,7 @@ mod tests {
|
||||
let mut env = TestingEnv::new();
|
||||
|
||||
// Prepares table
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(from_peer_id)),
|
||||
|
||||
@@ -142,7 +142,7 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(1024)),
|
||||
@@ -185,7 +185,7 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
|
||||
@@ -120,7 +120,7 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
|
||||
@@ -262,7 +262,7 @@ mod tests {
|
||||
let persistent_context = new_persistent_context();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![2]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(4)),
|
||||
@@ -295,7 +295,7 @@ mod tests {
|
||||
let persistent_context = new_persistent_context();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
@@ -330,7 +330,7 @@ mod tests {
|
||||
let persistent_context = new_persistent_context();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
@@ -369,7 +369,7 @@ mod tests {
|
||||
let leader_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(leader_peer),
|
||||
@@ -396,7 +396,7 @@ mod tests {
|
||||
let candidate_peer = persistent_context.to_peer.clone();
|
||||
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(candidate_peer),
|
||||
@@ -424,7 +424,7 @@ mod tests {
|
||||
let candidate_peer = persistent_context.to_peer.clone();
|
||||
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let table_info = new_test_table_info(1024).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(candidate_peer),
|
||||
@@ -454,7 +454,7 @@ mod tests {
|
||||
let opening_keeper = MemoryRegionKeeper::default();
|
||||
|
||||
let table_id = 1024;
|
||||
let table_info = new_test_table_info(table_id, vec![1]).into();
|
||||
let table_info = new_test_table_info(table_id).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(table_id, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
|
||||
@@ -381,7 +381,7 @@ mod tests {
|
||||
|
||||
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
|
||||
let region_id = ctx.persistent_ctx.region_ids[0];
|
||||
let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
|
||||
let table_info = new_test_table_info(region_id.table_id()).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
|
||||
|
||||
@@ -390,10 +390,7 @@ mod tests {
|
||||
.table_route_storage()
|
||||
.build_create_txn(
|
||||
1024,
|
||||
&TableRouteValue::Logical(LogicalTableRouteValue::new(
|
||||
1024,
|
||||
vec![RegionId::new(1023, 1)],
|
||||
)),
|
||||
&TableRouteValue::Logical(LogicalTableRouteValue::new(1024)),
|
||||
)
|
||||
.unwrap();
|
||||
kv_backend.txn(txn).await.unwrap();
|
||||
|
||||
@@ -271,7 +271,7 @@ pub async fn new_wal_prune_metadata(
|
||||
let region_ids = (0..n_region)
|
||||
.map(|i| RegionId::new(table_id, i))
|
||||
.collect::<Vec<_>>();
|
||||
let table_info = new_test_table_info(table_id, 0..n_region).into();
|
||||
let table_info = new_test_table_info(table_id).into();
|
||||
let region_routes = region_ids
|
||||
.iter()
|
||||
.map(|region_id| RegionRoute {
|
||||
|
||||
@@ -192,7 +192,6 @@ pub mod test_data {
|
||||
value_indices: vec![2],
|
||||
engine: MITO2_ENGINE.to_string(),
|
||||
next_column_id: 3,
|
||||
region_numbers: vec![1, 2, 3],
|
||||
options: TableOptions::default(),
|
||||
created_on: DateTime::default(),
|
||||
updated_on: DateTime::default(),
|
||||
|
||||
@@ -342,9 +342,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_collect_metadata() {
|
||||
let region_number = 1u32;
|
||||
let table_id = 1024;
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id).into();
|
||||
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let leader_peer_id = 1024;
|
||||
@@ -394,9 +393,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_basic() {
|
||||
let region_number = 1u32;
|
||||
let table_id = 1024;
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id).into();
|
||||
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let leader_peer_id = 1024;
|
||||
@@ -502,9 +500,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_unexpected_logic_table() {
|
||||
let region_number = 1u32;
|
||||
let table_id = 1024;
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id).into();
|
||||
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let keeper = new_test_keeper();
|
||||
@@ -512,7 +509,7 @@ mod tests {
|
||||
table_metadata_manager
|
||||
.create_table_metadata(
|
||||
table_info,
|
||||
TableRouteValue::Logical(LogicalTableRouteValue::new(table_id, vec![region_id])),
|
||||
TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)),
|
||||
HashMap::default(),
|
||||
)
|
||||
.await
|
||||
@@ -539,9 +536,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_with_downgrade_leader() {
|
||||
let region_number = 1u32;
|
||||
let table_id = 1024;
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
|
||||
let table_info: RawTableInfo = new_test_table_info(table_id).into();
|
||||
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
let leader_peer_id = 1024;
|
||||
|
||||
@@ -1070,7 +1070,7 @@ pub(crate) mod tests {
|
||||
let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
|
||||
test_create_logical_table_task.set_table_id(logical_table_id);
|
||||
let table_info = test_create_logical_table_task.table_info;
|
||||
let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
|
||||
let table_route = LogicalTableRouteValue::new(1024);
|
||||
let table_route_value = TableRouteValue::Logical(table_route);
|
||||
table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route_value, HashMap::new())
|
||||
|
||||
@@ -1220,7 +1220,6 @@ mod tests {
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.region_numbers(vec![0])
|
||||
.build()
|
||||
.unwrap();
|
||||
let info = Arc::new(
|
||||
|
||||
@@ -14,10 +14,9 @@
|
||||
|
||||
use ahash::{HashMap, HashSet};
|
||||
use api::v1::RowInsertRequests;
|
||||
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
|
||||
use api::v1::region::InsertRequests as RegionInsertRequests;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
@@ -54,20 +53,10 @@ impl<'a> RowToRegion<'a> {
|
||||
|
||||
let table_info = self.get_table_info(&request.table_name)?;
|
||||
let table_id = table_info.table_id();
|
||||
let region_numbers = self.region_numbers(&request.table_name)?;
|
||||
let requests = if let Some(region_id) = match region_numbers[..] {
|
||||
[singular] => Some(RegionId::new(table_id, singular)),
|
||||
_ => None,
|
||||
} {
|
||||
vec![InsertRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rows: Some(rows),
|
||||
}]
|
||||
} else {
|
||||
Partitioner::new(self.partition_manager)
|
||||
.partition_insert_requests(table_info, rows)
|
||||
.await?
|
||||
};
|
||||
|
||||
let requests = Partitioner::new(self.partition_manager)
|
||||
.partition_insert_requests(table_info, rows)
|
||||
.await?;
|
||||
|
||||
if self.instant_table_ids.contains(&table_id) {
|
||||
instant_request.extend(requests);
|
||||
@@ -91,11 +80,4 @@ impl<'a> RowToRegion<'a> {
|
||||
.get(table_name)
|
||||
.context(TableNotFoundSnafu { table_name })
|
||||
}
|
||||
|
||||
fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
|
||||
self.tables_info
|
||||
.get(table_name)
|
||||
.map(|x| &x.meta.region_numbers)
|
||||
.context(TableNotFoundSnafu { table_name })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1837,7 +1837,6 @@ pub fn create_table_info(
|
||||
value_indices: vec![],
|
||||
engine: create_table.engine.clone(),
|
||||
next_column_id: column_schemas.len() as u32,
|
||||
region_numbers: vec![],
|
||||
options: table_options,
|
||||
created_on: Utc::now(),
|
||||
updated_on: Utc::now(),
|
||||
|
||||
@@ -32,7 +32,7 @@ use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
|
||||
pub fn new_test_table_info(
|
||||
table_id: u32,
|
||||
table_name: &str,
|
||||
region_numbers: impl Iterator<Item = u32>,
|
||||
_region_numbers: impl Iterator<Item = u32>,
|
||||
) -> TableInfo {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
|
||||
@@ -55,7 +55,6 @@ pub fn new_test_table_info(
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.region_numbers(region_numbers.collect::<Vec<_>>())
|
||||
.partition_key_indices(vec![0])
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
@@ -199,7 +199,8 @@ impl PartitionRuleManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_partitions_from_region_routes(
|
||||
/// Creates partitions from region routes.
|
||||
pub fn create_partitions_from_region_routes(
|
||||
table_id: TableId,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<PartitionInfo>> {
|
||||
|
||||
@@ -88,7 +88,6 @@ impl TestTable {
|
||||
primary_key_indices: vec![0, 1, 2],
|
||||
value_indices: vec![4],
|
||||
engine,
|
||||
region_numbers: vec![0, 1],
|
||||
next_column_id: 5,
|
||||
options: Default::default(),
|
||||
created_on: Default::default(),
|
||||
|
||||
@@ -28,7 +28,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
|
||||
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::{DataFusionError, TableReference};
|
||||
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -40,7 +40,7 @@ use crate::dist_plan::PredicateExtractor;
|
||||
use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
|
||||
use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
|
||||
use crate::dist_plan::region_pruner::ConstraintPruner;
|
||||
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
|
||||
use crate::error::{CatalogSnafu, PartitionRuleManagerSnafu, TableNotFoundSnafu};
|
||||
use crate::region_query::RegionQueryHandlerRef;
|
||||
|
||||
/// Planner for convert merge sort logical plan to physical plan
|
||||
@@ -211,8 +211,16 @@ impl DistExtensionPlanner {
|
||||
})?;
|
||||
|
||||
let table_info = table.table_info();
|
||||
let all_regions = table_info.region_ids();
|
||||
|
||||
let physical_table_route = self
|
||||
.partition_rule_manager
|
||||
.find_physical_table_route(table_info.table_id())
|
||||
.await
|
||||
.context(PartitionRuleManagerSnafu)?;
|
||||
let all_regions = physical_table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|r| RegionId::new(table_info.table_id(), r.region.id.region_number()))
|
||||
.collect::<Vec<_>>();
|
||||
// Extract partition columns
|
||||
let partition_columns: Vec<String> =
|
||||
table_info.meta.partition_column_names().cloned().collect();
|
||||
@@ -256,11 +264,10 @@ impl DistExtensionPlanner {
|
||||
}
|
||||
|
||||
// Get partition information for the table if partition rule manager is available
|
||||
let partitions = match self
|
||||
.partition_rule_manager
|
||||
.find_table_partitions(table.table_info().table_id())
|
||||
.await
|
||||
{
|
||||
let partitions = match create_partitions_from_region_routes(
|
||||
table_info.table_id(),
|
||||
&physical_table_route.region_routes,
|
||||
) {
|
||||
Ok(partitions) => partitions,
|
||||
Err(err) => {
|
||||
common_telemetry::debug!(
|
||||
|
||||
@@ -57,6 +57,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Partition rule manager error"))]
|
||||
PartitionRuleManager {
|
||||
source: partition::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table not found: {}", table))]
|
||||
TableNotFound {
|
||||
table: String,
|
||||
@@ -396,6 +403,7 @@ impl ErrorExt for Error {
|
||||
QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
CreateRecordBatch { source, .. } => source.status_code(),
|
||||
PartitionRuleManager { source, .. } => source.status_code(),
|
||||
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
|
||||
PlanSql { error, .. } => {
|
||||
datafusion_status_code::<Self>(error, Some(StatusCode::PlanQuery))
|
||||
|
||||
@@ -322,7 +322,6 @@ mod tests {
|
||||
let table_name = "system_metrics";
|
||||
let schema_name = "public".to_string();
|
||||
let catalog_name = "greptime".to_string();
|
||||
let regions = vec![0, 1, 2];
|
||||
|
||||
let mut options = table::requests::TableOptions {
|
||||
ttl: Some(Duration::from_secs(30).into()),
|
||||
@@ -341,7 +340,6 @@ mod tests {
|
||||
.next_column_id(0)
|
||||
.options(options)
|
||||
.created_on(Default::default())
|
||||
.region_numbers(regions)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
|
||||
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
|
||||
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
|
||||
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
|
||||
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::requests::{
|
||||
@@ -135,8 +135,6 @@ pub struct TableMeta {
|
||||
pub value_indices: Vec<usize>,
|
||||
#[builder(default, setter(into))]
|
||||
pub engine: String,
|
||||
#[builder(default, setter(into))]
|
||||
pub region_numbers: Vec<u32>,
|
||||
pub next_column_id: ColumnId,
|
||||
/// Table options.
|
||||
#[builder(default)]
|
||||
@@ -160,7 +158,6 @@ impl TableMetaBuilder {
|
||||
primary_key_indices: None,
|
||||
value_indices: None,
|
||||
engine: None,
|
||||
region_numbers: None,
|
||||
next_column_id: None,
|
||||
options: None,
|
||||
created_on: None,
|
||||
@@ -194,7 +191,6 @@ impl TableMetaBuilder {
|
||||
primary_key_indices: Some(Vec::new()),
|
||||
value_indices: Some(Vec::new()),
|
||||
engine: None,
|
||||
region_numbers: Some(Vec::new()),
|
||||
next_column_id: Some(0),
|
||||
options: None,
|
||||
created_on: None,
|
||||
@@ -1088,13 +1084,6 @@ impl TableInfo {
|
||||
self.ident.table_id
|
||||
}
|
||||
|
||||
pub fn region_ids(&self) -> Vec<RegionId> {
|
||||
self.meta
|
||||
.region_numbers
|
||||
.iter()
|
||||
.map(|id| RegionId::new(self.table_id(), *id))
|
||||
.collect()
|
||||
}
|
||||
/// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
|
||||
pub fn full_table_name(&self) -> String {
|
||||
common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
|
||||
@@ -1174,7 +1163,6 @@ pub struct RawTableMeta {
|
||||
/// Next column id of a new column.
|
||||
/// It's used to ensure all columns with the same name across all regions have the same column id.
|
||||
pub next_column_id: ColumnId,
|
||||
pub region_numbers: Vec<u32>,
|
||||
pub options: TableOptions,
|
||||
pub created_on: DateTime<Utc>,
|
||||
pub updated_on: DateTime<Utc>,
|
||||
@@ -1201,7 +1189,6 @@ impl<'de> Deserialize<'de> for RawTableMeta {
|
||||
value_indices: Vec<usize>,
|
||||
engine: String,
|
||||
next_column_id: u32,
|
||||
region_numbers: Vec<u32>,
|
||||
options: TableOptions,
|
||||
created_on: DateTime<Utc>,
|
||||
updated_on: Option<DateTime<Utc>>,
|
||||
@@ -1218,7 +1205,6 @@ impl<'de> Deserialize<'de> for RawTableMeta {
|
||||
value_indices: h.value_indices,
|
||||
engine: h.engine,
|
||||
next_column_id: h.next_column_id,
|
||||
region_numbers: h.region_numbers,
|
||||
options: h.options,
|
||||
created_on: h.created_on,
|
||||
updated_on: h.updated_on.unwrap_or(h.created_on),
|
||||
@@ -1236,7 +1222,6 @@ impl From<TableMeta> for RawTableMeta {
|
||||
value_indices: meta.value_indices,
|
||||
engine: meta.engine,
|
||||
next_column_id: meta.next_column_id,
|
||||
region_numbers: meta.region_numbers,
|
||||
options: meta.options,
|
||||
created_on: meta.created_on,
|
||||
updated_on: meta.updated_on,
|
||||
@@ -1255,7 +1240,6 @@ impl TryFrom<RawTableMeta> for TableMeta {
|
||||
primary_key_indices: raw.primary_key_indices,
|
||||
value_indices: raw.value_indices,
|
||||
engine: raw.engine,
|
||||
region_numbers: raw.region_numbers,
|
||||
next_column_id: raw.next_column_id,
|
||||
options: raw.options,
|
||||
created_on: raw.created_on,
|
||||
@@ -1608,8 +1592,6 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let new_meta = add_columns_to_meta(&meta);
|
||||
assert_eq!(meta.region_numbers, new_meta.region_numbers);
|
||||
|
||||
let names: Vec<String> = new_meta
|
||||
.schema
|
||||
.column_schemas()
|
||||
@@ -1685,8 +1667,6 @@ mod tests {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(meta.region_numbers, new_meta.region_numbers);
|
||||
|
||||
let names: Vec<String> = new_meta
|
||||
.schema
|
||||
.column_schemas()
|
||||
@@ -2038,8 +2018,6 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let new_meta = add_columns_to_meta_with_location(&meta);
|
||||
assert_eq!(meta.region_numbers, new_meta.region_numbers);
|
||||
|
||||
let names: Vec<String> = new_meta
|
||||
.schema
|
||||
.column_schemas()
|
||||
@@ -2090,8 +2068,6 @@ mod tests {
|
||||
|
||||
// Add a string column and make it fulltext indexed
|
||||
let new_meta = add_columns_to_meta_with_location(&meta);
|
||||
assert_eq!(meta.region_numbers, new_meta.region_numbers);
|
||||
|
||||
let alter_kind = AlterKind::SetIndexes {
|
||||
options: vec![SetIndexOption::Fulltext {
|
||||
column_name: "my_tag_first".to_string(),
|
||||
|
||||
@@ -76,7 +76,6 @@ impl NumbersTable {
|
||||
primary_key_indices: vec![0],
|
||||
value_indices: vec![],
|
||||
engine,
|
||||
region_numbers: vec![0],
|
||||
next_column_id: 1,
|
||||
options: Default::default(),
|
||||
created_on: Default::default(),
|
||||
|
||||
@@ -27,7 +27,7 @@ use futures::Stream;
|
||||
use futures::task::{Context, Poll};
|
||||
use snafu::prelude::*;
|
||||
use store_api::data_source::DataSource;
|
||||
use store_api::storage::{RegionNumber, ScanRequest};
|
||||
use store_api::storage::ScanRequest;
|
||||
|
||||
use crate::error::{SchemaConversionSnafu, TableProjectionSnafu};
|
||||
use crate::metadata::{
|
||||
@@ -39,21 +39,16 @@ pub struct MemTable;
|
||||
|
||||
impl MemTable {
|
||||
pub fn table(table_name: impl Into<String>, recordbatch: RecordBatch) -> TableRef {
|
||||
Self::new_with_region(table_name, recordbatch, vec![0])
|
||||
Self::new_with_region(table_name, recordbatch)
|
||||
}
|
||||
|
||||
pub fn new_with_region(
|
||||
table_name: impl Into<String>,
|
||||
recordbatch: RecordBatch,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> TableRef {
|
||||
pub fn new_with_region(table_name: impl Into<String>, recordbatch: RecordBatch) -> TableRef {
|
||||
Self::new_with_catalog(
|
||||
table_name,
|
||||
recordbatch,
|
||||
1,
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
regions,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -63,7 +58,6 @@ impl MemTable {
|
||||
table_id: TableId,
|
||||
catalog_name: String,
|
||||
schema_name: String,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> TableRef {
|
||||
let schema = recordbatch.schema.clone();
|
||||
|
||||
@@ -75,7 +69,6 @@ impl MemTable {
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.region_numbers(regions)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -31,7 +31,6 @@ pub fn test_table_info(
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.region_numbers(vec![1])
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user