diff --git a/Cargo.lock b/Cargo.lock index f13c177350..5a27c3dd2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5747,7 +5747,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b9452e34113ec8d2a3d3e932cce386936041819b#b9452e34113ec8d2a3d3e932cce386936041819b" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=030f1e48ab1075aef8ebac5a1e0a38e081903430#030f1e48ab1075aef8ebac5a1e0a38e081903430" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", diff --git a/Cargo.toml b/Cargo.toml index ce3137df67..49552d2f2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b9452e34113ec8d2a3d3e932cce386936041819b" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "030f1e48ab1075aef8ebac5a1e0a38e081903430" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/client/src/flow.rs b/src/client/src/flow.rs index a3a77ee583..4fab136893 100644 --- a/src/client/src/flow.rs +++ b/src/client/src/flow.rs @@ -86,7 +86,7 @@ impl FlowRequester { .map(|insert| api::v1::flow::InsertRequest { region_id: insert.region_id, rows: insert.rows, - partition_rule_version: insert.partition_rule_version, + partition_expr_version: insert.partition_expr_version, }) .collect(), }; diff --git a/src/common/base/src/hash.rs b/src/common/base/src/hash.rs index 5ceb7ec3f5..a189b024d7 100644 --- a/src/common/base/src/hash.rs +++ b/src/common/base/src/hash.rs @@ -81,7 +81,7 @@ impl<'de> Deserialize<'de> for FixedRandomState { } } -pub fn partition_rule_version(expr_json: Option<&str>) -> u64 { +pub fn partition_expr_version(expr_json: Option<&str>) -> u64 { let expr = expr_json.unwrap_or_default(); if expr.is_empty() { return 0; diff --git a/src/common/function/src/utils.rs b/src/common/function/src/utils.rs index 5de8bcbbaa..7fc28a1b09 100644 --- a/src/common/function/src/utils.rs +++ b/src/common/function/src/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use common_base::hash::{FixedRandomState, partition_rule_version}; +pub use common_base::hash::{FixedRandomState, partition_expr_version}; /// Escapes special characters in the provided pattern string for `LIKE`. /// diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index e8da222a36..6e9036d6cd 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -123,7 +123,7 @@ impl flow_server::Flow for FlowService { api::v1::region::InsertRequest { region_id: insert.region_id, rows: insert.rows, - partition_rule_version: insert.partition_rule_version, + partition_expr_version: insert.partition_expr_version, } }) .collect_vec(), diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index a1cbdcede6..5d7479c5d0 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -92,7 +92,7 @@ mod tests { let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }); engine .handle_request(*logi_region_id, request) diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 5891841f36..5f9c58277a 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -180,7 +180,7 @@ impl MetricEngineInner { // Modify and collect rows from each request for (logical_region_id, mut request) in requests { - if let Some(request_version) = request.partition_rule_version { + if let Some(request_version) = request.partition_expr_version { if let Some(merged_version) = merged_version { ensure!( merged_version == request_version, @@ -222,7 +222,7 @@ impl MetricEngineInner { hint: Some(WriteHint { primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(), }), - partition_rule_version: merged_version, + partition_expr_version: merged_version, }; Ok((merged_request, total_affected_rows)) @@ -274,7 +274,7 @@ impl MetricEngineInner { let merged_request = RegionPutRequest { rows: final_rows, hint: None, - partition_rule_version: merged_version, + partition_expr_version: merged_version, }; Ok((merged_request, table_ids.len() as AffectedRows)) @@ -306,7 +306,7 @@ impl MetricEngineInner { let null_value = Value { value_data: None }; for (logical_region_id, request) in requests { - if let Some(request_version) = request.partition_rule_version { + if let Some(request_version) = request.partition_expr_version { if let Some(merged_version) = merged_version { ensure!( merged_version == request_version, @@ -559,7 +559,7 @@ mod tests { use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; - use common_function::utils::partition_rule_version; + use common_function::utils::partition_expr_version; use common_recordbatch::RecordBatches; use datatypes::value::Value as PartitionValue; use partition::expr::col; @@ -707,7 +707,7 @@ mod tests { rows: rows_1, }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ( @@ -718,7 +718,7 @@ mod tests { rows: rows_2, }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ] @@ -781,7 +781,7 @@ mod tests { let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }); // write data @@ -856,7 +856,7 @@ mod tests { let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }); // write data @@ -879,7 +879,7 @@ mod tests { let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }); engine @@ -900,7 +900,7 @@ mod tests { let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }); engine @@ -963,7 +963,7 @@ mod tests { rows: rows1, }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ( @@ -974,7 +974,7 @@ mod tests { rows: rows2, }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ( @@ -985,7 +985,7 @@ mod tests { rows: rows3, }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ]; @@ -1036,7 +1036,7 @@ mod tests { rows: test_util::build_rows(1, 3), }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ( @@ -1047,7 +1047,7 @@ mod tests { rows: test_util::build_rows(1, 2), }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ( @@ -1058,7 +1058,7 @@ mod tests { rows: test_util::build_rows(1, 5), }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, ), ]; @@ -1098,7 +1098,7 @@ mod tests { rows: test_util::build_rows(1, 5), }, hint: None, - partition_rule_version: None, + partition_expr_version: None, }, )]; @@ -1162,7 +1162,7 @@ mod tests { } #[tokio::test] - async fn test_metric_put_rejects_bad_partition_rule_version() { + async fn test_metric_put_rejects_bad_partition_expr_version() { let env = TestEnv::new().await; env.init_metric_region().await; @@ -1179,7 +1179,7 @@ mod tests { RegionRequest::Put(RegionPutRequest { rows, hint: None, - partition_rule_version: Some(1), + partition_expr_version: Some(1), }), ) .await @@ -1189,7 +1189,7 @@ mod tests { } #[tokio::test] - async fn test_metric_put_respects_staging_partition_rule_version() { + async fn test_metric_put_respects_staging_partition_expr_version() { let env = TestEnv::new().await; env.init_metric_region().await; @@ -1206,7 +1206,7 @@ mod tests { .await .unwrap(); - let expected_version = partition_rule_version(Some(&partition_expr)); + let expected_version = partition_expr_version(Some(&partition_expr)); let rows = Rows { schema: test_util::row_schema_with_tags(&["job"]), rows: test_util::build_rows(1, 3), @@ -1219,7 +1219,7 @@ mod tests { RegionRequest::Put(RegionPutRequest { rows: rows.clone(), hint: None, - partition_rule_version: Some(expected_version.wrapping_add(1)), + partition_expr_version: Some(expected_version.wrapping_add(1)), }), ) .await @@ -1233,7 +1233,7 @@ mod tests { RegionRequest::Put(RegionPutRequest { rows: rows.clone(), hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await @@ -1247,7 +1247,7 @@ mod tests { RegionRequest::Put(RegionPutRequest { rows, hint: None, - partition_rule_version: Some(expected_version), + partition_expr_version: Some(expected_version), }), ) .await diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index aa6fc26a73..d84cbad946 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -483,7 +483,7 @@ impl MetadataRegion { RegionPutRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, } } @@ -516,7 +516,7 @@ impl MetadataRegion { RegionDeleteRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, } } diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index e7c8bcea80..bb2af0bec4 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -17,7 +17,7 @@ use std::fs; use std::sync::Arc; use api::v1::Rows; -use common_function::utils::partition_rule_version; +use common_function::utils::partition_expr_version; use common_recordbatch::RecordBatches; use datatypes::value::Value; use partition::expr::{PartitionExpr, col}; @@ -634,7 +634,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl .await .unwrap(); - let expected_version = partition_rule_version(Some(&partition_expr)); + let expected_version = partition_expr_version(Some(&partition_expr)); let unflushed_rows = Rows { schema: column_schemas, rows: build_rows(3, 6), @@ -645,7 +645,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl RegionRequest::Put(RegionPutRequest { rows: unflushed_rows, hint: None, - partition_rule_version: Some(expected_version), + partition_expr_version: Some(expected_version), }), ) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2667241679..ed638bf69b 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -632,7 +632,7 @@ async fn test_absent_and_invalid_columns_with_format(flat_format: bool) { RegionRequest::Put(RegionPutRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index f2c3016ae2..fa957de670 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -103,7 +103,7 @@ pub(crate) async fn delete_and_flush( RegionRequest::Delete(RegionDeleteRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 63b2e65913..5ee25fb9ff 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -180,7 +180,7 @@ async fn test_engine_open_readonly_with_format(flat_format: bool) { RegionRequest::Put(RegionPutRequest { rows: rows.clone(), hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 2d694294f3..fd90cd99f7 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -108,7 +108,7 @@ async fn test_set_role_state_gracefully_with_format(flat_format: bool) { RegionRequest::Put(RegionPutRequest { rows: rows.clone(), hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await @@ -202,7 +202,7 @@ async fn test_write_downgrading_region_with_format(flat_format: bool) { RegionRequest::Put(RegionPutRequest { rows: rows.clone(), hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index ddc6ca3fb6..424e3306cc 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -22,7 +22,7 @@ use std::time::Duration; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_function::utils::partition_rule_version; +use common_function::utils::partition_expr_version; use common_recordbatch::RecordBatches; use datatypes::value::Value; use object_store::Buffer; @@ -253,12 +253,12 @@ fn default_partition_expr() -> String { } #[tokio::test] -async fn test_staging_write_partition_rule_version() { - test_staging_write_partition_rule_version_with_format(false).await; - test_staging_write_partition_rule_version_with_format(true).await; +async fn test_staging_write_partition_expr_version() { + test_staging_write_partition_expr_version_with_format(false).await; + test_staging_write_partition_expr_version_with_format(true).await; } -async fn test_staging_write_partition_rule_version_with_format(flat_format: bool) { +async fn test_staging_write_partition_expr_version_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let engine = env @@ -291,8 +291,8 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool .await .unwrap(); - let expected_version = partition_rule_version(Some(&partition_expr)); - let origin_version = partition_rule_version(Some(&origin_partition_expr)); + let expected_version = partition_expr_version(Some(&partition_expr)); + let origin_version = partition_expr_version(Some(&origin_partition_expr)); common_telemetry::info!( "expected_version: {}, origin_version: {}", expected_version, @@ -308,7 +308,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: bad_rows, hint: None, - partition_rule_version: Some(origin_version), + partition_expr_version: Some(origin_version), }), ) .await @@ -329,7 +329,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: compat_rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await @@ -346,7 +346,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: ok_rows, hint: None, - partition_rule_version: Some(expected_version), + partition_expr_version: Some(expected_version), }), ) .await @@ -395,7 +395,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: exit_rows, hint: None, - partition_rule_version: Some(origin_version), + partition_expr_version: Some(origin_version), }), ) .await @@ -412,7 +412,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: compat_rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await @@ -424,7 +424,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool .unwrap() .version() .metadata - .partition_rule_version; + .partition_expr_version; assert_ne!(0, committed_version); assert_eq!(committed_version, expected_version,); @@ -438,7 +438,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool RegionRequest::Put(RegionPutRequest { rows: commit_rows, hint: None, - partition_rule_version: Some(expected_version), + partition_expr_version: Some(expected_version), }), ) .await diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 509462716d..a75484256b 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -169,7 +169,7 @@ pub type MitoRegionRef = Arc; #[derive(Debug, Clone)] pub(crate) struct StagingPartitionInfo { pub(crate) partition_expr: String, - pub(crate) partition_rule_version: u64, + pub(crate) partition_expr_version: u64, } impl MitoRegion { @@ -807,15 +807,15 @@ impl MitoRegion { } } - pub fn expected_partition_rule_version(&self) -> u64 { + pub fn expected_partition_expr_version(&self) -> u64 { if self.is_staging() { let staging_partition_info = self.staging_partition_info.lock().unwrap(); staging_partition_info .as_ref() - .map(|info| info.partition_rule_version) + .map(|info| info.partition_expr_version) .unwrap_or_default() } else { - self.version().metadata.partition_rule_version + self.version().metadata.partition_expr_version } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index e38a2619f2..34cd4ee485 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -78,8 +78,8 @@ pub struct WriteRequest { pub hint: Option, /// Region metadata on the time of this request is created. pub(crate) region_metadata: Option, - /// Partition rule version for the region. - pub partition_rule_version: Option, + /// Partition expression version for the region. + pub partition_expr_version: Option, } impl WriteRequest { @@ -136,7 +136,7 @@ impl WriteRequest { has_null, hint: None, region_metadata, - partition_rule_version: None, + partition_expr_version: None, }) } @@ -146,8 +146,8 @@ impl WriteRequest { self } - pub fn with_partition_rule_version(mut self, partition_rule_version: Option) -> Self { - self.partition_rule_version = partition_rule_version; + pub fn with_partition_expr_version(mut self, partition_expr_version: Option) -> Self { + self.partition_expr_version = partition_expr_version; self } @@ -553,7 +553,7 @@ pub(crate) struct SenderBulkRequest { pub(crate) region_id: RegionId, pub(crate) request: BulkPart, pub(crate) region_metadata: RegionMetadataRef, - pub(crate) partition_rule_version: Option, + pub(crate) partition_expr_version: Option, } /// Request sent to a worker with timestamp @@ -667,7 +667,7 @@ impl WorkerRequest { let mut write_request = WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())? .with_hint(v.hint) - .with_partition_rule_version(v.partition_rule_version); + .with_partition_expr_version(v.partition_expr_version); if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense && let Some(region_metadata) = ®ion_metadata { @@ -682,7 +682,7 @@ impl WorkerRequest { let mut write_request = WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())? .with_hint(v.hint) - .with_partition_rule_version(v.partition_rule_version); + .with_partition_expr_version(v.partition_expr_version); if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense && let Some(region_metadata) = ®ion_metadata { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 56fe6b1fa7..842689bba6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1183,7 +1183,7 @@ pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { RegionRequest::Put(RegionPutRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await @@ -1226,7 +1226,7 @@ pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { RegionRequest::Delete(RegionDeleteRequest { rows, hint: None, - partition_rule_version: None, + partition_expr_version: None, }), ) .await diff --git a/src/mito2/src/worker/handle_bulk_insert.rs b/src/mito2/src/worker/handle_bulk_insert.rs index b37c543fba..a413271db3 100644 --- a/src/mito2/src/worker/handle_bulk_insert.rs +++ b/src/mito2/src/worker/handle_bulk_insert.rs @@ -91,7 +91,7 @@ impl RegionWorkerLoop { request: part, region_id: request.region_id, region_metadata, - partition_rule_version: request.partition_rule_version, + partition_expr_version: request.partition_expr_version, }); } } diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 79762c243b..52a31d03a8 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -14,7 +14,7 @@ use std::time::Instant; -use common_base::hash::partition_rule_version; +use common_base::hash::partition_expr_version; use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; use store_api::region_request::EnterStagingRequest; @@ -246,10 +246,10 @@ impl RegionWorkerLoop { fn update_region_staging_partition_info(region: &MitoRegionRef, partition_expr: String) { let mut staging_partition_info = region.staging_partition_info.lock().unwrap(); debug_assert!(staging_partition_info.is_none()); - let partition_rule_version = partition_rule_version(Some(&partition_expr)); + let partition_expr_version = partition_expr_version(Some(&partition_expr)); *staging_partition_info = Some(StagingPartitionInfo { partition_expr, - partition_rule_version, + partition_expr_version, }); } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index a244cb0a09..8745aa894e 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -305,11 +305,11 @@ impl RegionWorkerLoop { else { continue; }; - let expected_version = region.expected_partition_rule_version(); - if let Err(e) = check_partition_rule_version( + let expected_version = region.expected_partition_expr_version(); + if let Err(e) = check_partition_expr_version( region_id, expected_version, - sender_req.request.partition_rule_version, + sender_req.request.partition_expr_version, ) { sender_req.sender.send(Err(e)); continue; @@ -420,11 +420,11 @@ impl RegionWorkerLoop { let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else { continue; }; - let expected_version = region.expected_partition_rule_version(); - if let Err(e) = check_partition_rule_version( + let expected_version = region.expected_partition_expr_version(); + if let Err(e) = check_partition_expr_version( region_id, expected_version, - bulk_req.partition_rule_version, + bulk_req.partition_expr_version, ) { bulk_req.sender.send(Err(e)); continue; @@ -495,7 +495,7 @@ fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> { Ok(()) } -fn check_partition_rule_version( +fn check_partition_expr_version( region_id: RegionId, expected_version: u64, request_version: Option, diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 831deb795f..38ced30135 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -19,7 +19,7 @@ use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests}; use api::v1::region::{ BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, }; -use api::v1::{ArrowIpc, PartitionRuleVersion}; +use api::v1::{ArrowIpc, PartitionExprVersion}; use arrow::array::Array; use arrow::record_batch::RecordBatch; use bytes::Bytes; @@ -87,7 +87,7 @@ impl Inserter { // SAFETY: region masks length checked let (region_number, _) = region_masks.into_iter().next().unwrap(); let region_id = RegionId::new(table_id, region_number); - let partition_rule_version = partition_versions + let partition_expr_version = partition_versions .get(®ion_number) .copied() .unwrap_or_default(); @@ -104,8 +104,8 @@ impl Inserter { }), body: Some(region_request::Body::BulkInsert(BulkInsertRequest { region_id: region_id.as_u64(), - partition_rule_version: partition_rule_version - .map(|value| PartitionRuleVersion { value }), + partition_expr_version: partition_expr_version + .map(|value| PartitionExprVersion { value }), body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { schema: schema_bytes.clone(), data_header: raw_flight_data.data_header, @@ -156,7 +156,7 @@ impl Inserter { if mask.select_none() { continue; } - let partition_rule_version = partition_versions + let partition_expr_version = partition_versions .get(®ion_id.region_number()) .copied() .unwrap_or_default(); @@ -218,8 +218,8 @@ impl Inserter { }), body: Some(region_request::Body::BulkInsert(BulkInsertRequest { region_id: region_id.as_u64(), - partition_rule_version: partition_rule_version - .map(|value| PartitionRuleVersion { value }), + partition_expr_version: partition_expr_version + .map(|value| PartitionExprVersion { value }), body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { schema: schema_bytes, data_header: header, diff --git a/src/operator/src/req_convert/common/partitioner.rs b/src/operator/src/req_convert/common/partitioner.rs index c9b81751e1..b52a6f6be6 100644 --- a/src/operator/src/req_convert/common/partitioner.rs +++ b/src/operator/src/req_convert/common/partitioner.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::region::{DeleteRequest, InsertRequest}; -use api::v1::{PartitionRuleVersion, Rows}; +use api::v1::{PartitionExprVersion, Rows}; use partition::manager::PartitionRuleManager; use snafu::ResultExt; use store_api::storage::RegionId; @@ -43,11 +43,11 @@ impl<'a> Partitioner<'a> { .context(SplitInsertSnafu)? .into_iter() .map( - |(region_number, (rows, partition_rule_version))| InsertRequest { + |(region_number, (rows, partition_expr_version))| InsertRequest { region_id: RegionId::new(table_id, region_number).into(), rows: Some(rows), - partition_rule_version: partition_rule_version - .map(|value| PartitionRuleVersion { value }), + partition_expr_version: partition_expr_version + .map(|value| PartitionExprVersion { value }), }, ) .collect(); @@ -68,11 +68,11 @@ impl<'a> Partitioner<'a> { .context(SplitDeleteSnafu)? .into_iter() .map( - |(region_number, (rows, partition_rule_version))| DeleteRequest { + |(region_number, (rows, partition_expr_version))| DeleteRequest { region_id: RegionId::new(table_id, region_number).into(), rows: Some(rows), - partition_rule_version: partition_rule_version - .map(|value| PartitionRuleVersion { value }), + partition_expr_version: partition_expr_version + .map(|value| PartitionExprVersion { value }), }, ) .collect(); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index 4a2789bb2a..b6fde9b105 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -56,7 +56,7 @@ mod tests { use api::v1::helper::tag_column_schema; use api::v1::region::DeleteRequest as RegionDeleteRequest; use api::v1::value::ValueData; - use api::v1::{ColumnDataType, PartitionRuleVersion, Row, Value}; + use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::vectors::{Int32Vector, VectorRef}; use store_api::storage::RegionId; @@ -96,7 +96,7 @@ mod tests { .unwrap() .partitions .iter() - .map(|p| (p.id.as_u64(), p.partition_rule_version)) + .map(|p| (p.id.as_u64(), p.partition_expr_version)) .collect::>(); let region_requests = converter.convert(table_request).await.unwrap(); @@ -155,7 +155,7 @@ mod tests { }) .collect(), }), - partition_rule_version: version.map(|value| PartitionRuleVersion { value }), + partition_expr_version: version.map(|value| PartitionExprVersion { value }), } } } diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index 5959276257..a89b4d6204 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -72,7 +72,7 @@ mod tests { use api::v1::helper::tag_column_schema; use api::v1::region::InsertRequest as RegionInsertRequest; use api::v1::value::ValueData; - use api::v1::{ColumnDataType, PartitionRuleVersion, Row, Value}; + use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::vectors::{Int32Vector, VectorRef}; use store_api::storage::RegionId; @@ -112,7 +112,7 @@ mod tests { .unwrap() .partitions .iter() - .map(|p| (p.id.as_u64(), p.partition_rule_version)) + .map(|p| (p.id.as_u64(), p.partition_expr_version)) .collect::>(); let region_requests = converter.convert(table_request).await.unwrap(); @@ -172,7 +172,7 @@ mod tests { }) .collect(), }), - partition_rule_version: version.map(|value| PartitionRuleVersion { value }), + partition_expr_version: version.map(|value| PartitionExprVersion { value }), } } } diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index c3d7b70adb..78b35a6906 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use common_base::hash::partition_rule_version; +use common_base::hash::partition_expr_version; use common_meta::cache::{TableRouteCacheRef, new_table_route_cache}; use common_meta::key::TableMetadataManager; use common_meta::key::table_route::TableRouteValue; @@ -198,7 +198,7 @@ pub(crate) async fn create_partition_rule_manager( } #[tokio::test] -async fn test_partition_rule_version_cache() { +async fn test_partition_expr_version_cache() { let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new()); let partition_manager = create_partition_rule_manager(kv_backend).await; let partitions = partition_manager @@ -214,12 +214,12 @@ async fn test_partition_rule_version_cache() { .partition_expr .as_ref() .map(|expr| expr.as_json_str().unwrap()) - .map(|expr_json| partition_rule_version(Some(expr_json.as_str()))) + .map(|expr_json| partition_expr_version(Some(expr_json.as_str()))) .unwrap_or_default(); - assert_eq!(Some(expected), partition.partition_rule_version); + assert_eq!(Some(expected), partition.partition_expr_version); version_by_region.insert( partition.id.region_number(), - partition.partition_rule_version, + partition.partition_expr_version, ); } diff --git a/src/partition/src/cache.rs b/src/partition/src/cache.rs index 075a8b1abb..9dd94064bd 100644 --- a/src/partition/src/cache.rs +++ b/src/partition/src/cache.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use common_base::hash::partition_rule_version; +use common_base::hash::partition_expr_version; use common_error::ext::BoxedError; use common_meta::cache::{CacheContainer, Initializer, TableRoute, TableRouteCacheRef}; use common_meta::instruction::CacheIdent; @@ -58,10 +58,10 @@ pub fn create_partitions_with_version_from_region_routes( let mut partitions = Vec::with_capacity(region_routes.len()); for r in region_routes { let expr_json = r.region.partition_expr(); - let partition_rule_version = if expr_json.is_empty() { + let partition_expr_version = if expr_json.is_empty() { None } else { - Some(partition_rule_version(Some(expr_json.as_str()))) + Some(partition_expr_version(Some(expr_json.as_str()))) }; let partition_expr = PartitionExpr::from_json_str(expr_json.as_str()) .map_err(BoxedError::new) @@ -70,7 +70,7 @@ pub fn create_partitions_with_version_from_region_routes( partitions.push(PartitionInfoWithVersion { id, partition_expr, - partition_rule_version, + partition_expr_version, }); } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 8a45b54878..cc00f9caba 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -54,7 +54,7 @@ pub struct PartitionInfo { pub struct PartitionInfoWithVersion { pub id: RegionId, pub partition_expr: Option, - pub partition_rule_version: Option, + pub partition_expr_version: Option, } impl PartitionRuleManager { @@ -192,7 +192,7 @@ impl PartitionRuleManager { let partition_versions = partition_info .partitions .iter() - .map(|r| (r.id.region_number(), r.partition_rule_version)) + .map(|r| (r.id.region_number(), r.partition_expr_version)) .collect::>>(); let regions = partition_info .partitions diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 57146c4bdf..d571a5392f 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use api::v1::SemanticType; use api::v1::column_def::try_as_column_schema; use api::v1::region::RegionColumnDef; -use common_base::hash::partition_rule_version; +use common_base::hash::partition_expr_version; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; @@ -162,7 +162,7 @@ pub struct RegionMetadata { /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is. pub partition_expr: Option, #[serde(skip)] - pub partition_rule_version: u64, + pub partition_expr_version: u64, } impl fmt::Debug for RegionMetadata { @@ -202,8 +202,8 @@ impl<'de> Deserialize<'de> for RegionMetadata { let skipped = SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?; - let partition_rule_version = - partition_rule_version(without_schema.partition_expr.as_deref()); + let partition_expr_version = + partition_expr_version(without_schema.partition_expr.as_deref()); Ok(Self { schema: skipped.schema, @@ -215,7 +215,7 @@ impl<'de> Deserialize<'de> for RegionMetadata { schema_version: without_schema.schema_version, primary_key_encoding: without_schema.primary_key_encoding, partition_expr: without_schema.partition_expr, - partition_rule_version, + partition_expr_version, }) } } @@ -232,7 +232,7 @@ impl RegionMetadata { } pub fn set_partition_expr(&mut self, expr: Option) { - self.partition_rule_version = partition_rule_version(expr.as_deref()); + self.partition_expr_version = partition_expr_version(expr.as_deref()); self.partition_expr = expr; } @@ -374,7 +374,7 @@ impl RegionMetadata { schema_version: self.schema_version, primary_key_encoding: self.primary_key_encoding, partition_expr: self.partition_expr.clone(), - partition_rule_version: partition_rule_version(self.partition_expr.as_deref()), + partition_expr_version: partition_expr_version(self.partition_expr.as_deref()), }) } @@ -683,7 +683,7 @@ impl RegionMetadataBuilder { fn build_with_options(self, validate: bool) -> Result { let skipped = SkippedFields::new(&self.column_metadatas)?; - let partition_rule_version = partition_rule_version(self.partition_expr.as_deref()); + let partition_expr_version = partition_expr_version(self.partition_expr.as_deref()); let meta = RegionMetadata { schema: skipped.schema, time_index: skipped.time_index, @@ -694,7 +694,7 @@ impl RegionMetadataBuilder { schema_version: self.schema_version, primary_key_encoding: self.primary_key_encoding, partition_expr: self.partition_expr, - partition_rule_version, + partition_expr_version, }; if validate { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index c5a920ac8f..544afdee3f 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -207,7 +207,7 @@ fn make_region_puts(inserts: InsertRequests) -> Result Result Result Result> { let region_id = request.region_id.into(); - let partition_rule_version = request.partition_rule_version.map(|v| v.value); + let partition_expr_version = request.partition_expr_version.map(|v| v.value); let Some(Body::ArrowIpc(request)) = request.body else { return Ok(vec![]); }; @@ -424,7 +424,7 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result, - /// Partition rule version for the region. - pub partition_rule_version: Option, + /// Partition expression version for the region. + pub partition_expr_version: Option, } #[derive(Debug)] @@ -473,8 +473,8 @@ pub struct RegionDeleteRequest { pub rows: Rows, /// Write hint. pub hint: Option, - /// Partition rule version for the region. - pub partition_rule_version: Option, + /// Partition expression version for the region. + pub partition_expr_version: Option, } #[derive(Debug, Clone)] @@ -1458,7 +1458,7 @@ pub struct RegionBulkInsertsRequest { pub region_id: RegionId, pub payload: DfRecordBatch, pub raw_data: ArrowIpc, - pub partition_rule_version: Option, + pub partition_expr_version: Option, } impl RegionBulkInsertsRequest { diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index e6ce1c3f29..14cf734291 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -24,7 +24,7 @@ mod region_migration; #[macro_use] mod repartition; #[macro_use] -mod repartition_rule_version; +mod repartition_expr_version; grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); @@ -38,7 +38,7 @@ repartition_tests!(File); repartition_tests!(S3, S3WithCache, Oss, Azblob, Gcs); -repartition_rule_version_tests!(File); +repartition_expr_version_tests!(File); -repartition_rule_version_tests!(S3, S3WithCache, Oss, Azblob, Gcs); +repartition_expr_version_tests!(S3, S3WithCache, Oss, Azblob, Gcs); // TODO(niebayes): add integration tests for remote wal. diff --git a/tests-integration/tests/repartition_rule_version.rs b/tests-integration/tests/repartition_expr_version.rs similarity index 85% rename from tests-integration/tests/repartition_rule_version.rs rename to tests-integration/tests/repartition_expr_version.rs index 82229a48c0..abef03098b 100644 --- a/tests-integration/tests/repartition_rule_version.rs +++ b/tests-integration/tests/repartition_expr_version.rs @@ -30,17 +30,17 @@ use tests_integration::test_util::{StorageType, get_test_store_config}; use tokio::time::{Duration, sleep}; #[macro_export] -macro_rules! repartition_rule_version_tests { +macro_rules! repartition_expr_version_tests { ($($service:ident),*) => { $( paste::item! { - mod [] { + mod [] { #[tokio::test(flavor = "multi_thread")] - async fn [< test_repartition_rule_version >]() { + async fn [< test_repartition_expr_version >]() { let store_type = tests_integration::test_util::StorageType::$service; if store_type.test_on() { common_telemetry::init_default_ut_logging(); - $crate::repartition_rule_version::test_repartition_rule_version(store_type).await + $crate::repartition_expr_version::test_repartition_expr_version(store_type).await } } } @@ -49,12 +49,12 @@ macro_rules! repartition_rule_version_tests { }; } -pub async fn test_repartition_rule_version(store_type: StorageType) { - let cluster_name = "test_repartition_rule_version"; +pub async fn test_repartition_expr_version(store_type: StorageType) { + let cluster_name = "test_repartition_expr_version"; let (store_config, _guard) = get_test_store_config(&store_type); let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await; if matches!(store_type, StorageType::File) { - let home_dir = create_temp_dir("test_repartition_rule_version_data_home"); + let home_dir = create_temp_dir("test_repartition_expr_version_data_home"); builder = builder.with_shared_home_dir(Arc::new(home_dir)); } @@ -79,7 +79,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { let query_ctx = QueryContext::arc(); let sql = r#" - CREATE TABLE repartition_rule_version_table( + CREATE TABLE repartition_expr_version_table( `id` INT, `ts` TIMESTAMP TIME INDEX, `val` DOUBLE, @@ -98,7 +98,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { let id = next_id; next_id += 1; let sql = format!( - "INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 0.5)" + "INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 0.5)" ); run_sql(&instance, &sql, query_ctx.clone()).await.unwrap(); success_count += 1; @@ -110,7 +110,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { run_sql( &moved_instance, r#" - ALTER TABLE repartition_rule_version_table SPLIT PARTITION ( + ALTER TABLE repartition_expr_version_table SPLIT PARTITION ( `id` < 10 ) INTO ( `id` < 5, @@ -126,7 +126,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { let id = next_id; next_id += 1; let sql = format!( - "INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 1.0)" + "INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 1.0)" ); match run_sql(&instance, &sql, query_ctx.clone()).await { Ok(_) => success_count += 1, @@ -142,14 +142,14 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { let id = next_id; let sql = format!( - "INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 2.0)" + "INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 2.0)" ); run_sql(&instance, &sql, query_ctx.clone()).await.unwrap(); success_count += 1; let result = run_sql( &instance, - "SELECT count(*) FROM repartition_rule_version_table", + "SELECT count(*) FROM repartition_expr_version_table", query_ctx.clone(), ) .await @@ -160,7 +160,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) { run_sql( &instance, - "DROP TABLE repartition_rule_version_table", + "DROP TABLE repartition_expr_version_table", query_ctx, ) .await