From 2298227e0c441a4d47383b5cd199a54da79e74c2 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 23 Jul 2025 11:51:28 +0800 Subject: [PATCH] refactor: refactor partition mod to use PartitionExpr instead of PartitionDef (#6554) * refactor: refactor partition mod to use PartitionExpr instead of PartitionDef Signed-off-by: Zhenchi * fix snafu Signed-off-by: Zhenchi * Puts expression into PbPartition Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi * fix compile Signed-off-by: Zhenchi * update proto Signed-off-by: Zhenchi * add serde test Signed-off-by: Zhenchi * add serde test Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 10 +- Cargo.toml | 2 +- .../information_schema/partitions.rs | 9 +- src/cli/src/bench.rs | 1 + src/common/meta/src/ddl/table_meta.rs | 27 +-- src/common/meta/src/ddl/test_util.rs | 10 +- .../meta/src/ddl/test_util/create_table.rs | 5 +- src/common/meta/src/ddl/tests/create_table.rs | 20 +- src/common/meta/src/key.rs | 3 + src/common/meta/src/key/table_route.rs | 2 + src/common/meta/src/rpc/ddl.rs | 1 + src/common/meta/src/rpc/router.rs | 178 ++++++++++++++---- src/meta-srv/src/procedure/tests.rs | 9 +- src/operator/src/bulk_insert.rs | 4 +- .../src/req_convert/common/partitioner.rs | 13 +- .../src/req_convert/delete/row_to_region.rs | 4 +- .../src/req_convert/delete/table_to_region.rs | 2 +- .../src/req_convert/insert/row_to_region.rs | 8 +- .../src/req_convert/insert/stmt_to_region.rs | 2 +- .../src/req_convert/insert/table_to_region.rs | 2 +- src/operator/src/statement/ddl.rs | 113 +++++------ src/operator/src/statement/show.rs | 33 ++-- src/operator/src/tests/partition_manager.rs | 62 +++--- src/partition/src/error.rs | 9 - src/partition/src/expr.rs | 61 ++++++ src/partition/src/manager.rs | 64 ++----- src/partition/src/partition.rs | 6 +- src/table/src/metadata.rs | 8 + tests-integration/src/tests/instance_test.rs | 4 +- tests-integration/tests/http.rs | 2 +- .../create/metric_engine_partition.result | 40 ++-- .../cases/standalone/common/partition.result | 6 +- .../standalone/common/show/show_create.result | 4 +- 33 files changed, 409 insertions(+), 315 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0de71dc815..db87777827 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5223,7 +5223,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=36116a5c24fded2ed4980f5ba6feaca95346adc6#36116a5c24fded2ed4980f5ba6feaca95346adc6" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7fcaa3e413947a7a28d9af95812af26c1939ce78#7fcaa3e413947a7a28d9af95812af26c1939ce78" dependencies = [ "prost 0.13.5", "serde", @@ -6774,7 +6774,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9669,7 +9669,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -9715,7 +9715,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.100", @@ -14353,7 +14353,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f6ef4432da..4d108f8aff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "36116a5c24fded2ed4980f5ba6feaca95346adc6" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7fcaa3e413947a7a28d9af95812af26c1939ce78" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/catalog/src/system_schema/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs index 9b9634cb44..7fed20ec3c 100644 --- a/src/catalog/src/system_schema/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -329,13 +329,8 @@ impl InformationSchemaPartitionsBuilder { self.partition_names.push(Some(&partition_name)); self.partition_ordinal_positions .push(Some((index + 1) as i64)); - let expressions = if partition.partition.partition_columns().is_empty() { - None - } else { - Some(partition.partition.to_string()) - }; - - self.partition_expressions.push(expressions.as_deref()); + let expression = partition.partition_expr.as_ref().map(|e| e.to_string()); + self.partition_expressions.push(expression.as_deref()); self.create_times.push(Some(TimestampMicrosecond::from( table_info.meta.created_on.timestamp_millis(), ))); diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index d08bb06948..bbd1895f36 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -188,6 +188,7 @@ fn create_region_routes(regions: Vec) -> Vec { name: String::new(), partition: None, attrs: BTreeMap::new(), + partition_expr: Default::default(), }, leader_peer: Some(Peer { id: rng.random_range(0..10), diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index f07e2c2f6f..1103dc2bda 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -21,7 +21,7 @@ use snafu::ensure; use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::ddl::TableMetadata; -use crate::error::{self, Result, UnsupportedSnafu}; +use crate::error::{Result, UnsupportedSnafu}; use crate::key::table_route::PhysicalTableRouteValue; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; @@ -113,24 +113,18 @@ impl TableMetadataAllocator { table_id: TableId, task: &CreateTableTask, ) -> Result { - let regions = task.partitions.len(); - ensure!( - regions > 0, - error::UnexpectedSnafu { - err_msg: "The number of partitions must be greater than 0" - } - ); - + let regions = task.partitions.len().max(1); let peers = self.peer_allocator.alloc(regions).await?; debug!("Allocated peers {:?} for table {}", peers, table_id); - let region_routes = task + + let mut region_routes = task .partitions .iter() .enumerate() .map(|(i, partition)| { let region = Region { id: RegionId::new(table_id, i as u32), - partition: Some(partition.clone().into()), + partition_expr: partition.expression.clone(), ..Default::default() }; @@ -144,6 +138,17 @@ impl TableMetadataAllocator { }) .collect::>(); + if region_routes.is_empty() { + region_routes.push(RegionRoute { + region: Region { + id: RegionId::new(table_id, 0), + ..Default::default() + }, + leader_peer: Some(peers[0].clone()), + ..Default::default() + }); + } + Ok(PhysicalTableRouteValue::new(region_routes)) } diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 505abf870b..7d90d60953 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -146,10 +146,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: vec![Partition::default()], table_info, } } @@ -184,10 +181,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: vec![Partition::default()], table_info, } } diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index b0b7b3a5c9..b6e785804a 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -175,10 +175,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: vec![Partition::default()], table_info, } } diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 8e8d70957d..307655016c 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: vec![Partition::default()], table_info, } } @@ -213,21 +210,6 @@ async fn test_on_prepare_without_create_if_table_exists() { assert_eq!(procedure.table_id(), 1024); } -#[tokio::test] -async fn test_on_prepare_with_no_partition_err() { - let node_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(node_manager); - let mut task = test_create_table_task("foo"); - task.partitions = vec![]; - task.create_table.create_if_not_exists = true; - let mut procedure = CreateTableProcedure::new(task, ddl_context); - let err = procedure.on_prepare().await.unwrap_err(); - assert_matches!(err, Error::Unexpected { .. }); - assert!(err - .to_string() - .contains("The number of partitions must be greater than 0"),); -} - #[tokio::test] async fn test_on_datanode_create_regions_should_retry() { common_telemetry::init_default_ut_logging(); diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 36becc09e2..e48beaf834 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1509,6 +1509,7 @@ mod tests { name: "r1".to_string(), partition: None, attrs: BTreeMap::new(), + partition_expr: Default::default(), }, leader_peer: Some(Peer::new(datanode, "a2")), follower_peers: vec![], @@ -2001,6 +2002,7 @@ mod tests { name: "r1".to_string(), partition: None, attrs: BTreeMap::new(), + partition_expr: Default::default(), }, leader_peer: Some(Peer::new(datanode, "a2")), leader_state: Some(LeaderState::Downgrading), @@ -2013,6 +2015,7 @@ mod tests { name: "r2".to_string(), partition: None, attrs: BTreeMap::new(), + partition_expr: Default::default(), }, leader_peer: Some(Peer::new(datanode, "a1")), leader_state: None, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index dbf87adf2f..8b744623d4 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -711,6 +711,7 @@ mod tests { name: "r1".to_string(), partition: None, attrs: Default::default(), + partition_expr: Default::default(), }, leader_peer: Some(Peer { id: 2, @@ -726,6 +727,7 @@ mod tests { name: "r1".to_string(), partition: None, attrs: Default::default(), + partition_expr: Default::default(), }, leader_peer: Some(Peer { id: 2, diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 13d6736390..74ca629a80 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -606,6 +606,7 @@ impl From for PbDropTableTask { #[derive(Debug, PartialEq, Clone)] pub struct CreateTableTask { pub create_table: CreateTableExpr, + // TODO(zhongzc): change to `Vec` pub partitions: Vec, pub table_info: RawTableInfo, } diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index b7d14202f9..efe2af2c7a 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -355,8 +355,13 @@ impl RegionRoutes { pub struct Region { pub id: RegionId, pub name: String, - pub partition: Option, pub attrs: BTreeMap, + + /// **Deprecated:** Use `partition_expr` instead. + pub partition: Option, + /// The partition expression of the region. + #[serde(default)] + pub partition_expr: String, } impl Region { @@ -367,14 +372,47 @@ impl Region { ..Default::default() } } + + /// Gets the partition expression of the region in compatible mode. + pub fn partition_expr(&self) -> String { + if !self.partition_expr.is_empty() { + self.partition_expr.clone() + } else if let Some(LegacyPartition { value_list, .. }) = &self.partition { + if !value_list.is_empty() { + String::from_utf8_lossy(&value_list[0]).to_string() + } else { + "".to_string() + } + } else { + "".to_string() + } + } +} + +/// Gets the partition expression of the `PbRegion` in compatible mode. +#[allow(deprecated)] +pub fn pb_region_partition_expr(r: &PbRegion) -> String { + if let Some(partition) = &r.partition { + if !partition.expression.is_empty() { + partition.expression.clone() + } else if !partition.value_list.is_empty() { + String::from_utf8_lossy(&partition.value_list[0]).to_string() + } else { + "".to_string() + } + } else { + "".to_string() + } } impl From for Region { fn from(r: PbRegion) -> Self { + let partition_expr = pb_region_partition_expr(&r); Self { id: r.id.into(), name: r.name, - partition: r.partition.map(Into::into), + partition: None, + partition_expr, attrs: r.attrs.into_iter().collect::>(), } } @@ -382,17 +420,21 @@ impl From for Region { impl From for PbRegion { fn from(region: Region) -> Self { + let partition_expr = region.partition_expr(); Self { id: region.id.into(), name: region.name, - partition: region.partition.map(Into::into), + partition: Some(PbPartition { + expression: partition_expr, + ..Default::default() + }), attrs: region.attrs.into_iter().collect::>(), } } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] -pub struct Partition { +pub struct LegacyPartition { #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")] pub column_list: Vec>, #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")] @@ -440,20 +482,17 @@ where Ok(values) } -impl From for PbPartition { - fn from(p: Partition) -> Self { - Self { - column_list: p.column_list, - value_list: p.value_list, - } - } -} +impl From for PbPartition { + fn from(p: LegacyPartition) -> Self { + let expression = if !p.value_list.is_empty() { + String::from_utf8_lossy(&p.value_list[0]).to_string() + } else { + "".to_string() + }; -impl From for Partition { - fn from(p: PbPartition) -> Self { Self { - column_list: p.column_list, - value_list: p.value_list, + expression, + ..Default::default() } } } @@ -469,8 +508,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -491,8 +531,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -513,8 +554,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -529,8 +571,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -545,8 +588,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -561,8 +605,9 @@ mod tests { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -574,19 +619,6 @@ mod tests { assert_eq!(decoded, region_route); } - #[test] - fn test_de_serialize_partition() { - let p = Partition { - column_list: vec![b"a".to_vec(), b"b".to_vec()], - value_list: vec![b"hi".to_vec(), b",".to_vec()], - }; - - let output = serde_json::to_string(&p).unwrap(); - let got: Partition = serde_json::from_str(&output).unwrap(); - - assert_eq!(got, p); - } - #[test] fn test_region_distribution() { let region_routes = vec![ @@ -594,8 +626,9 @@ mod tests { region: Region { id: RegionId::new(1, 1), name: "r1".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], @@ -606,8 +639,9 @@ mod tests { region: Region { id: RegionId::new(1, 2), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), + partition: None, + partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(2, "a2")), follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")], @@ -622,4 +656,74 @@ mod tests { assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1])); assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2])); } + + #[test] + fn test_de_serialize_partition() { + let p = LegacyPartition { + column_list: vec![b"a".to_vec(), b"b".to_vec()], + value_list: vec![b"hi".to_vec(), b",".to_vec()], + }; + + let output = serde_json::to_string(&p).unwrap(); + let got: LegacyPartition = serde_json::from_str(&output).unwrap(); + + assert_eq!(got, p); + } + + #[test] + #[allow(deprecated)] + fn test_region_partition_expr() { + let r = PbRegion { + id: 1, + name: "r1".to_string(), + partition: None, + attrs: Default::default(), + }; + assert_eq!(pb_region_partition_expr(&r), ""); + + let r2: Region = r.into(); + assert_eq!(r2.partition_expr(), ""); + assert!(r2.partition.is_none()); + + let r3: PbRegion = r2.into(); + assert_eq!(r3.partition.as_ref().unwrap().expression, ""); + + let r = PbRegion { + id: 1, + name: "r1".to_string(), + partition: Some(PbPartition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"{}".to_vec()], + expression: Default::default(), + }), + attrs: Default::default(), + }; + assert_eq!(pb_region_partition_expr(&r), "{}"); + + let r2: Region = r.into(); + assert_eq!(r2.partition_expr(), "{}"); + assert!(r2.partition.is_none()); + + let r3: PbRegion = r2.into(); + assert_eq!(r3.partition.as_ref().unwrap().expression, "{}"); + + let r = PbRegion { + id: 1, + name: "r1".to_string(), + partition: Some(PbPartition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"{}".to_vec()], + expression: "a>b".to_string(), + }), + attrs: Default::default(), + }; + assert_eq!(pb_region_partition_expr(&r), "a>b"); + + let r2: Region = r.into(); + assert_eq!(r2.partition_expr(), "a>b"); + assert!(r2.partition.is_none()); + + let r3: PbRegion = r2.into(); + assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b"); + } } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index eda7d2c25a..2d3dc8c483 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -84,14 +84,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask { .into(); let table_info = build_raw_table_info_from_expr(&expr); - CreateTableTask::new( - expr, - vec![Partition { - column_list: vec![], - value_list: vec![], - }], - table_info, - ) + CreateTableTask::new(expr, vec![Partition::default()], table_info) } #[test] diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index dfd02398a9..a7946abd7d 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -67,7 +67,7 @@ impl Inserter { } // notify flownode to update dirty timestamps if flow is configured. - self.maybe_update_flow_dirty_window(table_info, record_batch.clone()); + self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone()); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS @@ -81,7 +81,7 @@ impl Inserter { .start_timer(); let partition_rule = self .partition_manager - .find_table_partition_rule(table_id) + .find_table_partition_rule(&table_info) .await .context(error::InvalidPartitionSnafu)?; diff --git a/src/operator/src/req_convert/common/partitioner.rs b/src/operator/src/req_convert/common/partitioner.rs index b9f2117c7b..fbee58bcdd 100644 --- a/src/operator/src/req_convert/common/partitioner.rs +++ b/src/operator/src/req_convert/common/partitioner.rs @@ -16,7 +16,8 @@ use api::v1::region::{DeleteRequest, InsertRequest}; use api::v1::Rows; use partition::manager::PartitionRuleManager; use snafu::ResultExt; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; +use table::metadata::TableInfo; use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu}; @@ -31,12 +32,13 @@ impl<'a> Partitioner<'a> { pub async fn partition_insert_requests( &self, - table_id: TableId, + table_info: &TableInfo, rows: Rows, ) -> Result> { + let table_id = table_info.table_id(); let requests = self .partition_manager - .split_rows(table_id, rows) + .split_rows(table_info, rows) .await .context(SplitInsertSnafu)? .into_iter() @@ -50,12 +52,13 @@ impl<'a> Partitioner<'a> { pub async fn partition_delete_requests( &self, - table_id: TableId, + table_info: &TableInfo, rows: Rows, ) -> Result> { + let table_id = table_info.table_id(); let requests = self .partition_manager - .split_rows(table_id, rows) + .split_rows(table_info, rows) .await .context(SplitDeleteSnafu)? .into_iter() diff --git a/src/operator/src/req_convert/delete/row_to_region.rs b/src/operator/src/req_convert/delete/row_to_region.rs index d04659c6c8..7dca98003f 100644 --- a/src/operator/src/req_convert/delete/row_to_region.rs +++ b/src/operator/src/req_convert/delete/row_to_region.rs @@ -46,10 +46,10 @@ impl<'a> RowToRegion<'a> { let mut region_request = Vec::with_capacity(requests.deletes.len()); for request in requests.deletes { let table = self.get_table(&request.table_name).await?; - let table_id = table.table_info().table_id(); + let table_info = table.table_info(); let requests = Partitioner::new(self.partition_manager) - .partition_delete_requests(table_id, request.rows.unwrap_or_default()) + .partition_delete_requests(&table_info, request.rows.unwrap_or_default()) .await?; region_request.extend(requests); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index 5baaa078a4..d48fb08127 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -42,7 +42,7 @@ impl<'a> TableToRegion<'a> { let rows = Rows { schema, rows }; let requests = Partitioner::new(self.partition_manager) - .partition_delete_requests(self.table_info.table_id(), rows) + .partition_delete_requests(self.table_info, rows) .await?; Ok(RegionDeleteRequests { requests }) } diff --git a/src/operator/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs index 1e74704825..fe37be0140 100644 --- a/src/operator/src/req_convert/insert/row_to_region.rs +++ b/src/operator/src/req_convert/insert/row_to_region.rs @@ -52,7 +52,8 @@ impl<'a> RowToRegion<'a> { for request in requests.inserts { let Some(rows) = request.rows else { continue }; - let table_id = self.get_table_id(&request.table_name)?; + let table_info = self.get_table_info(&request.table_name)?; + let table_id = table_info.table_id(); let region_numbers = self.region_numbers(&request.table_name)?; let requests = if let Some(region_id) = match region_numbers[..] { [singular] => Some(RegionId::new(table_id, singular)), @@ -64,7 +65,7 @@ impl<'a> RowToRegion<'a> { }] } else { Partitioner::new(self.partition_manager) - .partition_insert_requests(table_id, rows) + .partition_insert_requests(table_info, rows) .await? }; @@ -85,10 +86,9 @@ impl<'a> RowToRegion<'a> { }) } - fn get_table_id(&self, table_name: &str) -> Result { + fn get_table_info(&self, table_name: &str) -> Result<&TableInfoRef> { self.tables_info .get(table_name) - .map(|x| x.table_id()) .context(TableNotFoundSnafu { table_name }) } diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index 43b0689985..d11fee0605 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -135,7 +135,7 @@ impl<'a> StatementToRegion<'a> { } let requests = Partitioner::new(self.partition_manager) - .partition_insert_requests(table_info.table_id(), Rows { schema, rows }) + .partition_insert_requests(&table_info, Rows { schema, rows }) .await?; let requests = RegionInsertRequests { requests }; if table_info.is_ttl_instant_table() { diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index abc435ee03..7ae46e5d2e 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -46,7 +46,7 @@ impl<'a> TableToRegion<'a> { let rows = Rows { schema, rows }; let requests = Partitioner::new(self.partition_manager) - .partition_insert_requests(self.table_info.table_id(), rows) + .partition_insert_requests(self.table_info, rows) .await?; let requests = RegionInsertRequests { requests }; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 6e2c69e5c1..472a57eb25 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -43,7 +43,7 @@ use common_meta::rpc::ddl::{ CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, }; -use common_meta::rpc::router::{Partition, Partition as MetaPartition}; +use common_meta::rpc::router::{LegacyPartition, LegacyPartition as MetaPartition}; use common_query::Output; use common_sql::convert::sql_value_to_value; use common_telemetry::{debug, info, tracing, warn}; @@ -152,14 +152,16 @@ impl StatementExecutor { create_stmt.name = stmt.table_name; create_stmt.if_not_exists = false; - let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| { - if !partitions.column_list.is_empty() { - partitions.set_quote(quote_style); - Some(partitions) - } else { - None - } - }); + let table_info = table_ref.table_info(); + let partitions = + create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| { + if !partitions.column_list.is_empty() { + partitions.set_quote(quote_style); + Some(partitions) + } else { + None + } + }); let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?; self.create_table_inner(create_expr, partitions, ctx).await @@ -1335,7 +1337,7 @@ impl StatementExecutor { async fn create_table_procedure( &self, create_table: CreateTableExpr, - partitions: Vec, + partitions: Vec, table_info: RawTableInfo, query_context: QueryContextRef, ) -> Result { @@ -1544,25 +1546,23 @@ pub fn parse_partitions( find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?; // Validates partition - let mut exprs = vec![]; - for partition in &partition_entries { - for bound in partition { - if let PartitionBound::Expr(expr) = bound { - exprs.push(expr.clone()); - } - } - } + let exprs = partition_entries.clone(); MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true) .context(InvalidPartitionSnafu)?; - Ok(( - partition_entries - .into_iter() - .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) - .collect::>() - .context(DeserializePartitionSnafu)?, - partition_columns, - )) + // TODO(zhongzc): change to PartitionExpr + let meta_partitions: Vec = partition_entries + .into_iter() + .map(|x| { + MetaPartition::try_from(PartitionDef::new( + partition_columns.clone(), + vec![PartitionBound::Expr(x)], + )) + }) + .collect::>() + .context(DeserializePartitionSnafu)?; + + Ok((meta_partitions, partition_columns)) } /// Verifies an alter and returns whether it is necessary to perform the alter. @@ -1726,48 +1726,39 @@ fn find_partition_entries( partitions: &Option, partition_columns: &[String], query_ctx: &QueryContextRef, -) -> Result>> { - let entries = if let Some(partitions) = partitions { - // extract concrete data type of partition columns - let column_defs = partition_columns - .iter() - .map(|pc| { - create_table - .column_defs - .iter() - .find(|c| &c.name == pc) - // unwrap is safe here because we have checked that partition columns are defined - .unwrap() - }) - .collect::>(); - let mut column_name_and_type = HashMap::with_capacity(column_defs.len()); - for column in column_defs { +) -> Result> { + let Some(partitions) = partitions else { + return Ok(vec![]); + }; + + // extract concrete data type of partition columns + let column_name_and_type = partition_columns + .iter() + .map(|pc| { + let column = create_table + .column_defs + .iter() + .find(|c| &c.name == pc) + // unwrap is safe here because we have checked that partition columns are defined + .unwrap(); let column_name = &column.name; let data_type = ConcreteDataType::from( ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension) .context(ColumnDataTypeSnafu)?, ); - column_name_and_type.insert(column_name, data_type); - } + Ok((column_name, data_type)) + }) + .collect::>>()?; - // Transform parser expr to partition expr - let mut partition_exprs = Vec::with_capacity(partitions.exprs.len()); - for partition in &partitions.exprs { - let partition_expr = - convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?; - partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]); - } + // Transform parser expr to partition expr + let mut partition_exprs = Vec::with_capacity(partitions.exprs.len()); + for partition in &partitions.exprs { + let partition_expr = + convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?; + partition_exprs.push(partition_expr); + } - // fallback for no expr - if partition_exprs.is_empty() { - partition_exprs.push(vec![PartitionBound::MaxValue]); - } - - partition_exprs - } else { - vec![vec![PartitionBound::MaxValue]] - }; - Ok(entries) + Ok(partition_exprs) } fn convert_one_expr( diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 5251872559..2f4d09c811 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -17,18 +17,16 @@ use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; use common_telemetry::tracing; use partition::manager::PartitionInfo; -use partition::partition::PartitionBound; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt}; -use sql::ast::Ident; use sql::statements::create::Partitions; use sql::statements::show::{ ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews, }; use sql::statements::OptionMap; -use table::metadata::TableType; +use table::metadata::{TableInfo, TableType}; use table::table_name::TableName; use table::TableRef; @@ -143,13 +141,13 @@ impl StatementExecutor { let partitions = self .partition_manager - .find_table_partitions(table.table_info().table_id()) + .find_table_partitions(table_info.table_id()) .await .context(error::FindTablePartitionRuleSnafu { table_name: &table_name.table_name, })?; - let partitions = create_partitions_stmt(partitions)?; + let partitions = create_partitions_stmt(&table_info, partitions)?; query::sql::show_create_table(table, schema_options, partitions, query_ctx) .context(ExecuteStatementSnafu) @@ -329,15 +327,17 @@ impl StatementExecutor { } } -pub(crate) fn create_partitions_stmt(partitions: Vec) -> Result> { +pub(crate) fn create_partitions_stmt( + table_info: &TableInfo, + partitions: Vec, +) -> Result> { if partitions.is_empty() { return Ok(None); } - let column_list: Vec = partitions[0] - .partition - .partition_columns() - .iter() + let column_list = table_info + .meta + .partition_column_names() .map(|name| name[..].into()) .collect(); @@ -345,16 +345,9 @@ pub(crate) fn create_partitions_stmt(partitions: Vec) -> Result>()) + .partition_key_indices(vec![0]) .build() .unwrap(); TableInfoBuilder::default() @@ -105,10 +106,8 @@ pub(crate) async fn create_partition_rule_manager( let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); let table_route_cache = test_new_table_route_cache(kv_backend.clone()); let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); - let regions = vec![1u32, 2, 3]; let region_wal_options = new_test_region_wal_options(regions.clone()); - table_metadata_manager .create_table_metadata( new_test_table_info(1, "table_1", regions.clone().into_iter()).into(), @@ -117,19 +116,15 @@ pub(crate) async fn create_partition_rule_manager( region: Region { id: 3.into(), name: "r1".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Expr(PartitionExpr::new( - Operand::Column("a".to_string()), - RestrictedOp::Lt, - Operand::Value(datatypes::value::Value::Int32(10)), - ))], - ) - .try_into() - .unwrap(), - ), + partition: None, attrs: BTreeMap::new(), + partition_expr: PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::Int32(10)), + ) + .as_json_str() + .unwrap(), }, leader_peer: Some(Peer::new(3, "")), follower_peers: vec![], @@ -140,27 +135,23 @@ pub(crate) async fn create_partition_rule_manager( region: Region { id: 2.into(), name: "r2".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Expr(PartitionExpr::new( - Operand::Expr(PartitionExpr::new( - Operand::Column("a".to_string()), - RestrictedOp::GtEq, - Operand::Value(datatypes::value::Value::Int32(10)), - )), - RestrictedOp::And, - Operand::Expr(PartitionExpr::new( - Operand::Column("a".to_string()), - RestrictedOp::Lt, - Operand::Value(datatypes::value::Value::Int32(50)), - )), - ))], - ) - .try_into() - .unwrap(), - ), + partition: None, attrs: BTreeMap::new(), + partition_expr: PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int32(10)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::Int32(50)), + )), + ) + .as_json_str() + .unwrap(), }, leader_peer: Some(Peer::new(2, "")), follower_peers: vec![], @@ -171,6 +162,7 @@ pub(crate) async fn create_partition_rule_manager( region: Region { id: 1.into(), name: "r3".to_string(), + // Keep the old partition definition to test compatibility. partition: Some( PartitionDef::new( vec!["a".to_string()], @@ -184,6 +176,7 @@ pub(crate) async fn create_partition_rule_manager( .unwrap(), ), attrs: BTreeMap::new(), + partition_expr: Default::default(), }, leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], @@ -195,6 +188,5 @@ pub(crate) async fn create_partition_rule_manager( ) .await .unwrap(); - partition_manager } diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 3e423ed9f0..284ccadf45 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -108,14 +108,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid table route data, table id: {}, msg: {}", table_id, err_msg))] - InvalidTableRouteData { - table_id: TableId, - err_msg: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to convert DataFusion's ScalarValue: {:?}", value))] ConvertScalarValue { value: ScalarValue, @@ -265,7 +257,6 @@ impl ErrorExt for Error { | Error::DeserializeJson { .. } => StatusCode::Internal, Error::Unexpected { .. } - | Error::InvalidTableRouteData { .. } | Error::FindTableRoutes { .. } | Error::FindRegionRoutes { .. } => StatusCode::Unexpected, Error::TableRouteNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/partition/src/expr.rs b/src/partition/src/expr.rs index aa12bb6d79..67b8e36234 100644 --- a/src/partition/src/expr.rs +++ b/src/partition/src/expr.rs @@ -29,6 +29,7 @@ use sql::statements::value_to_sql_value; use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident}; use crate::error; +use crate::partition::PartitionBound; /// Struct for partition expression. This can be converted back to sqlparser's [Expr]. /// by [`Self::to_parser_expr`]. @@ -267,6 +268,29 @@ impl PartitionExpr { pub fn and(self, rhs: PartitionExpr) -> PartitionExpr { PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs)) } + + /// Serializes `PartitionExpr` to json string. + /// + /// Wraps `PartitionBound::Expr` for compatibility. + pub fn as_json_str(&self) -> error::Result { + serde_json::to_string(&PartitionBound::Expr(self.clone())) + .context(error::SerializeJsonSnafu) + } + + /// Deserializes `PartitionExpr` from json string. + /// + /// Deserializes to `PartitionBound` for compatibility. + pub fn from_json_str(s: &str) -> error::Result> { + if s.is_empty() { + return Ok(None); + } + + let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?; + match bound { + PartitionBound::Expr(expr) => Ok(Some(expr)), + _ => Ok(None), + } + } } impl Display for PartitionExpr { @@ -347,4 +371,41 @@ mod tests { assert_eq!(case.3, expr.to_string()); } } + + #[test] + fn test_serde_partition_expr() { + let expr = PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Eq, + Operand::Value(Value::UInt32(10)), + ); + let json = expr.as_json_str().unwrap(); + assert_eq!( + json, + "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}" + ); + + let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#; + let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap(); + let expected = PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(Value::UInt32(10)), + ); + assert_eq!(expr2, expected); + + // empty string + let json = ""; + let expr3 = PartitionExpr::from_json_str(json).unwrap(); + assert!(expr3.is_none()); + + // variants other than Expr + let json = r#""MaxValue""#; + let expr4 = PartitionExpr::from_json_str(json).unwrap(); + assert!(expr4.is_none()); + + let json = r#"{"Value":{"UInt32":10}}"#; + let expr5 = PartitionExpr::from_json_str(json).unwrap(); + assert!(expr5.is_none()); + } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index ad52804f57..50aed6dd99 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -23,11 +23,11 @@ use common_meta::peer::Peer; use common_meta::rpc::router::{self, RegionRoute}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfo}; use crate::error::{FindLeaderSnafu, Result}; +use crate::expr::PartitionExpr; use crate::multi_dim::MultiDimPartitionRule; -use crate::partition::{PartitionBound, PartitionDef}; use crate::splitter::RowSplitter; use crate::{error, PartitionRuleRef}; @@ -52,7 +52,7 @@ pub struct PartitionRuleManager { #[derive(Debug)] pub struct PartitionInfo { pub id: RegionId, - pub partition: PartitionDef, + pub partition_expr: Option, } impl PartitionRuleManager { @@ -149,11 +149,11 @@ impl PartitionRuleManager { Ok(results) } - pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result { - let partitions = self.find_table_partitions(table_id).await?; - - let partition_columns = partitions[0].partition.partition_columns(); - + pub async fn find_table_partition_rule( + &self, + table_info: &TableInfo, + ) -> Result { + let partitions = self.find_table_partitions(table_info.table_id()).await?; let regions = partitions .iter() .map(|x| x.id.region_number()) @@ -161,14 +161,16 @@ impl PartitionRuleManager { let exprs = partitions .iter() - .filter_map(|x| match &x.partition.partition_bounds()[0] { - PartitionBound::Expr(e) => Some(e.clone()), - _ => None, - }) + .filter_map(|x| x.partition_expr.as_ref()) + .cloned() .collect::>(); - let rule = - MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs, false)?; + let partition_columns = table_info + .meta + .partition_column_names() + .cloned() + .collect::>(); + let rule = MultiDimPartitionRule::try_new(partition_columns, regions, exprs, false)?; Ok(Arc::new(rule) as _) } @@ -189,10 +191,10 @@ impl PartitionRuleManager { pub async fn split_rows( &self, - table_id: TableId, + table_info: &TableInfo, rows: Rows, ) -> Result> { - let partition_rule = self.find_table_partition_rule(table_id).await?; + let partition_rule = self.find_table_partition_rule(table_info).await?; RowSplitter::new(partition_rule).split(rows) } } @@ -203,40 +205,14 @@ fn create_partitions_from_region_routes( ) -> Result> { let mut partitions = Vec::with_capacity(region_routes.len()); for r in region_routes { - let partition = r - .region - .partition - .as_ref() - .context(error::FindRegionRoutesSnafu { - region_id: r.region.id, - table_id, - })?; - let partition_def = PartitionDef::try_from(partition)?; + let partition_expr = PartitionExpr::from_json_str(&r.region.partition_expr())?; // The region routes belong to the physical table but are shared among all logical tables. // That it to say, the region id points to the physical table, so we need to use the actual // table id (which may be a logical table) to renew the region id. let id = RegionId::new(table_id, r.region.id.region_number()); - partitions.push(PartitionInfo { - id, - partition: partition_def, - }); + partitions.push(PartitionInfo { id, partition_expr }); } - partitions.sort_by(|a, b| { - a.partition - .partition_bounds() - .cmp(b.partition.partition_bounds()) - }); - - ensure!( - partitions - .windows(2) - .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()), - error::InvalidTableRouteDataSnafu { - table_id, - err_msg: "partition columns of all regions are not the same" - } - ); Ok(partitions) } diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 8095e4b584..67da449e62 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; -use common_meta::rpc::router::Partition as MetaPartition; +use common_meta::rpc::router::LegacyPartition as MetaPartition; use datatypes::arrow::array::{BooleanArray, RecordBatch}; use datatypes::prelude::Value; use itertools::Itertools; @@ -48,10 +48,12 @@ pub trait PartitionRule: Sync + Send { ) -> Result>; } -/// The right bound(exclusive) of partition range. +/// The bound of one partition. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum PartitionBound { + /// Deprecated since 0.9.0. Value(Value), + /// Deprecated since 0.15.0. MaxValue, Expr(crate::expr::PartitionExpr), } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 3eb4ccdffa..1d91164fee 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -218,6 +218,13 @@ impl TableMeta { .map(|(_, cs)| &cs.name) } + pub fn partition_column_names(&self) -> impl Iterator { + let columns_schemas = &self.schema.column_schemas(); + self.partition_key_indices + .iter() + .map(|idx| &columns_schemas[*idx].name) + } + /// Returns the new [TableMetaBuilder] after applying given `alter_kind`. /// /// The returned builder would derive the next column id of this meta. @@ -985,6 +992,7 @@ impl TableMeta { Ok(meta_builder) } } + #[derive(Clone, Debug, PartialEq, Eq, Builder)] #[builder(pattern = "owned")] pub struct TableInfo { diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 6406b1c8a2..8bf4675939 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -128,9 +128,9 @@ PARTITION ON COLUMNS (n) ( | | ) | | | PARTITION ON COLUMNS ("n") ( | | | n < 1, | -| | n >= 100, | | | n >= 1 AND n < 10, | -| | n >= 10 AND n < 100 | +| | n >= 10 AND n < 100, | +| | n >= 100 | | | ) | | | ENGINE=mito | | | | diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c82851600d..bed77b07d8 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3816,7 +3816,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; - let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= 'f',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, diff --git a/tests/cases/standalone/common/create/metric_engine_partition.result b/tests/cases/standalone/common/create/metric_engine_partition.result index db3249e382..028a788230 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.result +++ b/tests/cases/standalone/common/create/metric_engine_partition.result @@ -67,26 +67,26 @@ Affected Rows: 0 show create table logical_table_2; -+-----------------+-------------------------------------------------------------------------------+ -| Table | Create Table | -+-----------------+-------------------------------------------------------------------------------+ -| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( | -| | "cpu" DOUBLE NULL, | -| | "host" STRING NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("host") | -| | ) | -| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | -| | host <= 'host1', | -| | host > 'host2', | -| | host > 'host1' AND host <= 'host2' | -| | ) | -| | ENGINE=metric | -| | WITH( | -| | on_physical_table = 'metric_engine_partition' | -| | ) | -+-----------------+-------------------------------------------------------------------------------+ ++-----------------+-------------------------------------------------+ +| Table | Create Table | ++-----------------+-------------------------------------------------+ +| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( | +| | "cpu" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | PARTITION ON COLUMNS ("host") ( | +| | host <= 'host1', | +| | host > 'host1' AND host <= 'host2', | +| | host > 'host2' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+-------------------------------------------------+ select count(*) from logical_table_2; diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index c5be5aaec5..9fa432aeaf 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -18,8 +18,8 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres | table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | +---------------+--------------+------------+----------------+------------------------+-----------------------+ | greptime | public | my_table | p0 | a < 1000 | ID | -| greptime | public | my_table | p1 | a >= 2000 | ID | -| greptime | public | my_table | p2 | a >= 1000 AND a < 2000 | ID | +| greptime | public | my_table | p1 | a >= 1000 AND a < 2000 | ID | +| greptime | public | my_table | p2 | a >= 2000 | ID | +---------------+--------------+------------+----------------+------------------------+-----------------------+ -- SQLNESS REPLACE (\d{13}) REGION_ID @@ -126,7 +126,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres +---------------+--------------+------------+----------------+----------------------+-----------------------+ | table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | +---------------+--------------+------------+----------------+----------------------+-----------------------+ -| greptime | public | my_table | p0 | MAXVALUE | ID | +| greptime | public | my_table | p0 | | ID | +---------------+--------------+------------+----------------+----------------------+-----------------------+ -- SQLNESS REPLACE (\d{13}) REGION_ID diff --git a/tests/cases/standalone/common/show/show_create.result b/tests/cases/standalone/common/show/show_create.result index d11aedf99c..aa7947d9f3 100644 --- a/tests/cases/standalone/common/show/show_create.result +++ b/tests/cases/standalone/common/show/show_create.result @@ -36,8 +36,8 @@ SHOW CREATE TABLE system_metrics; | | ) | | | PARTITION ON COLUMNS ("id") ( | | | id < 5, | -| | id >= 9, | -| | id >= 5 AND id < 9 | +| | id >= 5 AND id < 9, | +| | id >= 9 | | | ) | | | ENGINE=mito | | | WITH( |