refactor: rename partition rule version to partition expr version (#7696)

* refactor: rename partition rule version to partition expr version

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-02-10 18:12:47 +08:00
committed by GitHub
parent 45a3e1121d
commit 0ed3b83099
32 changed files with 147 additions and 147 deletions

2
Cargo.lock generated
View File

@@ -5747,7 +5747,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b9452e34113ec8d2a3d3e932cce386936041819b#b9452e34113ec8d2a3d3e932cce386936041819b"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=030f1e48ab1075aef8ebac5a1e0a38e081903430#030f1e48ab1075aef8ebac5a1e0a38e081903430"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",

View File

@@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b9452e34113ec8d2a3d3e932cce386936041819b" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "030f1e48ab1075aef8ebac5a1e0a38e081903430" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -86,7 +86,7 @@ impl FlowRequester {
.map(|insert| api::v1::flow::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
partition_rule_version: insert.partition_rule_version,
partition_expr_version: insert.partition_expr_version,
})
.collect(),
};

View File

@@ -81,7 +81,7 @@ impl<'de> Deserialize<'de> for FixedRandomState {
}
}
pub fn partition_rule_version(expr_json: Option<&str>) -> u64 {
pub fn partition_expr_version(expr_json: Option<&str>) -> u64 {
let expr = expr_json.unwrap_or_default();
if expr.is_empty() {
return 0;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use common_base::hash::{FixedRandomState, partition_rule_version};
pub use common_base::hash::{FixedRandomState, partition_expr_version};
/// Escapes special characters in the provided pattern string for `LIKE`.
///

View File

@@ -123,7 +123,7 @@ impl flow_server::Flow for FlowService {
api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
partition_rule_version: insert.partition_rule_version,
partition_expr_version: insert.partition_expr_version,
}
})
.collect_vec(),

View File

@@ -92,7 +92,7 @@ mod tests {
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_rule_version: None,
partition_expr_version: None,
});
engine
.handle_request(*logi_region_id, request)

View File

@@ -180,7 +180,7 @@ impl MetricEngineInner {
// Modify and collect rows from each request
for (logical_region_id, mut request) in requests {
if let Some(request_version) = request.partition_rule_version {
if let Some(request_version) = request.partition_expr_version {
if let Some(merged_version) = merged_version {
ensure!(
merged_version == request_version,
@@ -222,7 +222,7 @@ impl MetricEngineInner {
hint: Some(WriteHint {
primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
}),
partition_rule_version: merged_version,
partition_expr_version: merged_version,
};
Ok((merged_request, total_affected_rows))
@@ -274,7 +274,7 @@ impl MetricEngineInner {
let merged_request = RegionPutRequest {
rows: final_rows,
hint: None,
partition_rule_version: merged_version,
partition_expr_version: merged_version,
};
Ok((merged_request, table_ids.len() as AffectedRows))
@@ -306,7 +306,7 @@ impl MetricEngineInner {
let null_value = Value { value_data: None };
for (logical_region_id, request) in requests {
if let Some(request_version) = request.partition_rule_version {
if let Some(request_version) = request.partition_expr_version {
if let Some(merged_version) = merged_version {
ensure!(
merged_version == request_version,
@@ -559,7 +559,7 @@ mod tests {
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::utils::partition_rule_version;
use common_function::utils::partition_expr_version;
use common_recordbatch::RecordBatches;
use datatypes::value::Value as PartitionValue;
use partition::expr::col;
@@ -707,7 +707,7 @@ mod tests {
rows: rows_1,
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
(
@@ -718,7 +718,7 @@ mod tests {
rows: rows_2,
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
]
@@ -781,7 +781,7 @@ mod tests {
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_rule_version: None,
partition_expr_version: None,
});
// write data
@@ -856,7 +856,7 @@ mod tests {
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_rule_version: None,
partition_expr_version: None,
});
// write data
@@ -879,7 +879,7 @@ mod tests {
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_rule_version: None,
partition_expr_version: None,
});
engine
@@ -900,7 +900,7 @@ mod tests {
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_rule_version: None,
partition_expr_version: None,
});
engine
@@ -963,7 +963,7 @@ mod tests {
rows: rows1,
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
(
@@ -974,7 +974,7 @@ mod tests {
rows: rows2,
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
(
@@ -985,7 +985,7 @@ mod tests {
rows: rows3,
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
];
@@ -1036,7 +1036,7 @@ mod tests {
rows: test_util::build_rows(1, 3),
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
(
@@ -1047,7 +1047,7 @@ mod tests {
rows: test_util::build_rows(1, 2),
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
(
@@ -1058,7 +1058,7 @@ mod tests {
rows: test_util::build_rows(1, 5),
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
),
];
@@ -1098,7 +1098,7 @@ mod tests {
rows: test_util::build_rows(1, 5),
},
hint: None,
partition_rule_version: None,
partition_expr_version: None,
},
)];
@@ -1162,7 +1162,7 @@ mod tests {
}
#[tokio::test]
async fn test_metric_put_rejects_bad_partition_rule_version() {
async fn test_metric_put_rejects_bad_partition_expr_version() {
let env = TestEnv::new().await;
env.init_metric_region().await;
@@ -1179,7 +1179,7 @@ mod tests {
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_rule_version: Some(1),
partition_expr_version: Some(1),
}),
)
.await
@@ -1189,7 +1189,7 @@ mod tests {
}
#[tokio::test]
async fn test_metric_put_respects_staging_partition_rule_version() {
async fn test_metric_put_respects_staging_partition_expr_version() {
let env = TestEnv::new().await;
env.init_metric_region().await;
@@ -1206,7 +1206,7 @@ mod tests {
.await
.unwrap();
let expected_version = partition_rule_version(Some(&partition_expr));
let expected_version = partition_expr_version(Some(&partition_expr));
let rows = Rows {
schema: test_util::row_schema_with_tags(&["job"]),
rows: test_util::build_rows(1, 3),
@@ -1219,7 +1219,7 @@ mod tests {
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
partition_rule_version: Some(expected_version.wrapping_add(1)),
partition_expr_version: Some(expected_version.wrapping_add(1)),
}),
)
.await
@@ -1233,7 +1233,7 @@ mod tests {
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await
@@ -1247,7 +1247,7 @@ mod tests {
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_rule_version: Some(expected_version),
partition_expr_version: Some(expected_version),
}),
)
.await

View File

@@ -483,7 +483,7 @@ impl MetadataRegion {
RegionPutRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}
}
@@ -516,7 +516,7 @@ impl MetadataRegion {
RegionDeleteRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}
}

View File

@@ -17,7 +17,7 @@ use std::fs;
use std::sync::Arc;
use api::v1::Rows;
use common_function::utils::partition_rule_version;
use common_function::utils::partition_expr_version;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
@@ -634,7 +634,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
.await
.unwrap();
let expected_version = partition_rule_version(Some(&partition_expr));
let expected_version = partition_expr_version(Some(&partition_expr));
let unflushed_rows = Rows {
schema: column_schemas,
rows: build_rows(3, 6),
@@ -645,7 +645,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
RegionRequest::Put(RegionPutRequest {
rows: unflushed_rows,
hint: None,
partition_rule_version: Some(expected_version),
partition_expr_version: Some(expected_version),
}),
)
.await

View File

@@ -632,7 +632,7 @@ async fn test_absent_and_invalid_columns_with_format(flat_format: bool) {
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await

View File

@@ -103,7 +103,7 @@ pub(crate) async fn delete_and_flush(
RegionRequest::Delete(RegionDeleteRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await

View File

@@ -180,7 +180,7 @@ async fn test_engine_open_readonly_with_format(flat_format: bool) {
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await

View File

@@ -108,7 +108,7 @@ async fn test_set_role_state_gracefully_with_format(flat_format: bool) {
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await
@@ -202,7 +202,7 @@ async fn test_write_downgrading_region_with_format(flat_format: bool) {
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await

View File

@@ -22,7 +22,7 @@ use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::utils::partition_rule_version;
use common_function::utils::partition_expr_version;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use object_store::Buffer;
@@ -253,12 +253,12 @@ fn default_partition_expr() -> String {
}
#[tokio::test]
async fn test_staging_write_partition_rule_version() {
test_staging_write_partition_rule_version_with_format(false).await;
test_staging_write_partition_rule_version_with_format(true).await;
async fn test_staging_write_partition_expr_version() {
test_staging_write_partition_expr_version_with_format(false).await;
test_staging_write_partition_expr_version_with_format(true).await;
}
async fn test_staging_write_partition_rule_version_with_format(flat_format: bool) {
async fn test_staging_write_partition_expr_version_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
@@ -291,8 +291,8 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
.await
.unwrap();
let expected_version = partition_rule_version(Some(&partition_expr));
let origin_version = partition_rule_version(Some(&origin_partition_expr));
let expected_version = partition_expr_version(Some(&partition_expr));
let origin_version = partition_expr_version(Some(&origin_partition_expr));
common_telemetry::info!(
"expected_version: {}, origin_version: {}",
expected_version,
@@ -308,7 +308,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: bad_rows,
hint: None,
partition_rule_version: Some(origin_version),
partition_expr_version: Some(origin_version),
}),
)
.await
@@ -329,7 +329,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: compat_rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await
@@ -346,7 +346,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: ok_rows,
hint: None,
partition_rule_version: Some(expected_version),
partition_expr_version: Some(expected_version),
}),
)
.await
@@ -395,7 +395,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: exit_rows,
hint: None,
partition_rule_version: Some(origin_version),
partition_expr_version: Some(origin_version),
}),
)
.await
@@ -412,7 +412,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: compat_rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await
@@ -424,7 +424,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
.unwrap()
.version()
.metadata
.partition_rule_version;
.partition_expr_version;
assert_ne!(0, committed_version);
assert_eq!(committed_version, expected_version,);
@@ -438,7 +438,7 @@ async fn test_staging_write_partition_rule_version_with_format(flat_format: bool
RegionRequest::Put(RegionPutRequest {
rows: commit_rows,
hint: None,
partition_rule_version: Some(expected_version),
partition_expr_version: Some(expected_version),
}),
)
.await

View File

@@ -169,7 +169,7 @@ pub type MitoRegionRef = Arc<MitoRegion>;
#[derive(Debug, Clone)]
pub(crate) struct StagingPartitionInfo {
pub(crate) partition_expr: String,
pub(crate) partition_rule_version: u64,
pub(crate) partition_expr_version: u64,
}
impl MitoRegion {
@@ -807,15 +807,15 @@ impl MitoRegion {
}
}
pub fn expected_partition_rule_version(&self) -> u64 {
pub fn expected_partition_expr_version(&self) -> u64 {
if self.is_staging() {
let staging_partition_info = self.staging_partition_info.lock().unwrap();
staging_partition_info
.as_ref()
.map(|info| info.partition_rule_version)
.map(|info| info.partition_expr_version)
.unwrap_or_default()
} else {
self.version().metadata.partition_rule_version
self.version().metadata.partition_expr_version
}
}
}

View File

@@ -78,8 +78,8 @@ pub struct WriteRequest {
pub hint: Option<WriteHint>,
/// Region metadata on the time of this request is created.
pub(crate) region_metadata: Option<RegionMetadataRef>,
/// Partition rule version for the region.
pub partition_rule_version: Option<u64>,
/// Partition expression version for the region.
pub partition_expr_version: Option<u64>,
}
impl WriteRequest {
@@ -136,7 +136,7 @@ impl WriteRequest {
has_null,
hint: None,
region_metadata,
partition_rule_version: None,
partition_expr_version: None,
})
}
@@ -146,8 +146,8 @@ impl WriteRequest {
self
}
pub fn with_partition_rule_version(mut self, partition_rule_version: Option<u64>) -> Self {
self.partition_rule_version = partition_rule_version;
pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
self.partition_expr_version = partition_expr_version;
self
}
@@ -553,7 +553,7 @@ pub(crate) struct SenderBulkRequest {
pub(crate) region_id: RegionId,
pub(crate) request: BulkPart,
pub(crate) region_metadata: RegionMetadataRef,
pub(crate) partition_rule_version: Option<u64>,
pub(crate) partition_expr_version: Option<u64>,
}
/// Request sent to a worker with timestamp
@@ -667,7 +667,7 @@ impl WorkerRequest {
let mut write_request =
WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
.with_hint(v.hint)
.with_partition_rule_version(v.partition_rule_version);
.with_partition_expr_version(v.partition_expr_version);
if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
&& let Some(region_metadata) = &region_metadata
{
@@ -682,7 +682,7 @@ impl WorkerRequest {
let mut write_request =
WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
.with_hint(v.hint)
.with_partition_rule_version(v.partition_rule_version);
.with_partition_expr_version(v.partition_expr_version);
if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
&& let Some(region_metadata) = &region_metadata
{

View File

@@ -1183,7 +1183,7 @@ pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await
@@ -1226,7 +1226,7 @@ pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
RegionRequest::Delete(RegionDeleteRequest {
rows,
hint: None,
partition_rule_version: None,
partition_expr_version: None,
}),
)
.await

View File

@@ -91,7 +91,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
request: part,
region_id: request.region_id,
region_metadata,
partition_rule_version: request.partition_rule_version,
partition_expr_version: request.partition_expr_version,
});
}
}

View File

@@ -14,7 +14,7 @@
use std::time::Instant;
use common_base::hash::partition_rule_version;
use common_base::hash::partition_expr_version;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::EnterStagingRequest;
@@ -246,10 +246,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
fn update_region_staging_partition_info(region: &MitoRegionRef, partition_expr: String) {
let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
debug_assert!(staging_partition_info.is_none());
let partition_rule_version = partition_rule_version(Some(&partition_expr));
let partition_expr_version = partition_expr_version(Some(&partition_expr));
*staging_partition_info = Some(StagingPartitionInfo {
partition_expr,
partition_rule_version,
partition_expr_version,
});
}
}

View File

@@ -305,11 +305,11 @@ impl<S> RegionWorkerLoop<S> {
else {
continue;
};
let expected_version = region.expected_partition_rule_version();
if let Err(e) = check_partition_rule_version(
let expected_version = region.expected_partition_expr_version();
if let Err(e) = check_partition_expr_version(
region_id,
expected_version,
sender_req.request.partition_rule_version,
sender_req.request.partition_expr_version,
) {
sender_req.sender.send(Err(e));
continue;
@@ -420,11 +420,11 @@ impl<S> RegionWorkerLoop<S> {
let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
continue;
};
let expected_version = region.expected_partition_rule_version();
if let Err(e) = check_partition_rule_version(
let expected_version = region.expected_partition_expr_version();
if let Err(e) = check_partition_expr_version(
region_id,
expected_version,
bulk_req.partition_rule_version,
bulk_req.partition_expr_version,
) {
bulk_req.sender.send(Err(e));
continue;
@@ -495,7 +495,7 @@ fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
Ok(())
}
fn check_partition_rule_version(
fn check_partition_expr_version(
region_id: RegionId,
expected_version: u64,
request_version: Option<u64>,

View File

@@ -19,7 +19,7 @@ use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests};
use api::v1::region::{
BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
};
use api::v1::{ArrowIpc, PartitionRuleVersion};
use api::v1::{ArrowIpc, PartitionExprVersion};
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
@@ -87,7 +87,7 @@ impl Inserter {
// SAFETY: region masks length checked
let (region_number, _) = region_masks.into_iter().next().unwrap();
let region_id = RegionId::new(table_id, region_number);
let partition_rule_version = partition_versions
let partition_expr_version = partition_versions
.get(&region_number)
.copied()
.unwrap_or_default();
@@ -104,8 +104,8 @@ impl Inserter {
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
region_id: region_id.as_u64(),
partition_rule_version: partition_rule_version
.map(|value| PartitionRuleVersion { value }),
partition_expr_version: partition_expr_version
.map(|value| PartitionExprVersion { value }),
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema: schema_bytes.clone(),
data_header: raw_flight_data.data_header,
@@ -156,7 +156,7 @@ impl Inserter {
if mask.select_none() {
continue;
}
let partition_rule_version = partition_versions
let partition_expr_version = partition_versions
.get(&region_id.region_number())
.copied()
.unwrap_or_default();
@@ -218,8 +218,8 @@ impl Inserter {
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
region_id: region_id.as_u64(),
partition_rule_version: partition_rule_version
.map(|value| PartitionRuleVersion { value }),
partition_expr_version: partition_expr_version
.map(|value| PartitionExprVersion { value }),
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema: schema_bytes,
data_header: header,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::{PartitionRuleVersion, Rows};
use api::v1::{PartitionExprVersion, Rows};
use partition::manager::PartitionRuleManager;
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -43,11 +43,11 @@ impl<'a> Partitioner<'a> {
.context(SplitInsertSnafu)?
.into_iter()
.map(
|(region_number, (rows, partition_rule_version))| InsertRequest {
|(region_number, (rows, partition_expr_version))| InsertRequest {
region_id: RegionId::new(table_id, region_number).into(),
rows: Some(rows),
partition_rule_version: partition_rule_version
.map(|value| PartitionRuleVersion { value }),
partition_expr_version: partition_expr_version
.map(|value| PartitionExprVersion { value }),
},
)
.collect();
@@ -68,11 +68,11 @@ impl<'a> Partitioner<'a> {
.context(SplitDeleteSnafu)?
.into_iter()
.map(
|(region_number, (rows, partition_rule_version))| DeleteRequest {
|(region_number, (rows, partition_expr_version))| DeleteRequest {
region_id: RegionId::new(table_id, region_number).into(),
rows: Some(rows),
partition_rule_version: partition_rule_version
.map(|value| PartitionRuleVersion { value }),
partition_expr_version: partition_expr_version
.map(|value| PartitionExprVersion { value }),
},
)
.collect();

View File

@@ -56,7 +56,7 @@ mod tests {
use api::v1::helper::tag_column_schema;
use api::v1::region::DeleteRequest as RegionDeleteRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, PartitionRuleVersion, Row, Value};
use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
@@ -96,7 +96,7 @@ mod tests {
.unwrap()
.partitions
.iter()
.map(|p| (p.id.as_u64(), p.partition_rule_version))
.map(|p| (p.id.as_u64(), p.partition_expr_version))
.collect::<HashMap<_, _>>();
let region_requests = converter.convert(table_request).await.unwrap();
@@ -155,7 +155,7 @@ mod tests {
})
.collect(),
}),
partition_rule_version: version.map(|value| PartitionRuleVersion { value }),
partition_expr_version: version.map(|value| PartitionExprVersion { value }),
}
}
}

View File

@@ -72,7 +72,7 @@ mod tests {
use api::v1::helper::tag_column_schema;
use api::v1::region::InsertRequest as RegionInsertRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, PartitionRuleVersion, Row, Value};
use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
@@ -112,7 +112,7 @@ mod tests {
.unwrap()
.partitions
.iter()
.map(|p| (p.id.as_u64(), p.partition_rule_version))
.map(|p| (p.id.as_u64(), p.partition_expr_version))
.collect::<HashMap<_, _>>();
let region_requests = converter.convert(table_request).await.unwrap();
@@ -172,7 +172,7 @@ mod tests {
})
.collect(),
}),
partition_rule_version: version.map(|value| PartitionRuleVersion { value }),
partition_expr_version: version.map(|value| PartitionExprVersion { value }),
}
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use common_base::hash::partition_rule_version;
use common_base::hash::partition_expr_version;
use common_meta::cache::{TableRouteCacheRef, new_table_route_cache};
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
@@ -198,7 +198,7 @@ pub(crate) async fn create_partition_rule_manager(
}
#[tokio::test]
async fn test_partition_rule_version_cache() {
async fn test_partition_expr_version_cache() {
let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
let partition_manager = create_partition_rule_manager(kv_backend).await;
let partitions = partition_manager
@@ -214,12 +214,12 @@ async fn test_partition_rule_version_cache() {
.partition_expr
.as_ref()
.map(|expr| expr.as_json_str().unwrap())
.map(|expr_json| partition_rule_version(Some(expr_json.as_str())))
.map(|expr_json| partition_expr_version(Some(expr_json.as_str())))
.unwrap_or_default();
assert_eq!(Some(expected), partition.partition_rule_version);
assert_eq!(Some(expected), partition.partition_expr_version);
version_by_region.insert(
partition.id.region_number(),
partition.partition_rule_version,
partition.partition_expr_version,
);
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_base::hash::partition_rule_version;
use common_base::hash::partition_expr_version;
use common_error::ext::BoxedError;
use common_meta::cache::{CacheContainer, Initializer, TableRoute, TableRouteCacheRef};
use common_meta::instruction::CacheIdent;
@@ -58,10 +58,10 @@ pub fn create_partitions_with_version_from_region_routes(
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
let expr_json = r.region.partition_expr();
let partition_rule_version = if expr_json.is_empty() {
let partition_expr_version = if expr_json.is_empty() {
None
} else {
Some(partition_rule_version(Some(expr_json.as_str())))
Some(partition_expr_version(Some(expr_json.as_str())))
};
let partition_expr = PartitionExpr::from_json_str(expr_json.as_str())
.map_err(BoxedError::new)
@@ -70,7 +70,7 @@ pub fn create_partitions_with_version_from_region_routes(
partitions.push(PartitionInfoWithVersion {
id,
partition_expr,
partition_rule_version,
partition_expr_version,
});
}

View File

@@ -54,7 +54,7 @@ pub struct PartitionInfo {
pub struct PartitionInfoWithVersion {
pub id: RegionId,
pub partition_expr: Option<PartitionExpr>,
pub partition_rule_version: Option<u64>,
pub partition_expr_version: Option<u64>,
}
impl PartitionRuleManager {
@@ -192,7 +192,7 @@ impl PartitionRuleManager {
let partition_versions = partition_info
.partitions
.iter()
.map(|r| (r.id.region_number(), r.partition_rule_version))
.map(|r| (r.id.region_number(), r.partition_expr_version))
.collect::<HashMap<RegionNumber, Option<u64>>>();
let regions = partition_info
.partitions

View File

@@ -24,7 +24,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::column_def::try_as_column_schema;
use api::v1::region::RegionColumnDef;
use common_base::hash::partition_rule_version;
use common_base::hash::partition_expr_version;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
@@ -162,7 +162,7 @@ pub struct RegionMetadata {
/// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
pub partition_expr: Option<String>,
#[serde(skip)]
pub partition_rule_version: u64,
pub partition_expr_version: u64,
}
impl fmt::Debug for RegionMetadata {
@@ -202,8 +202,8 @@ impl<'de> Deserialize<'de> for RegionMetadata {
let skipped =
SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
let partition_rule_version =
partition_rule_version(without_schema.partition_expr.as_deref());
let partition_expr_version =
partition_expr_version(without_schema.partition_expr.as_deref());
Ok(Self {
schema: skipped.schema,
@@ -215,7 +215,7 @@ impl<'de> Deserialize<'de> for RegionMetadata {
schema_version: without_schema.schema_version,
primary_key_encoding: without_schema.primary_key_encoding,
partition_expr: without_schema.partition_expr,
partition_rule_version,
partition_expr_version,
})
}
}
@@ -232,7 +232,7 @@ impl RegionMetadata {
}
pub fn set_partition_expr(&mut self, expr: Option<String>) {
self.partition_rule_version = partition_rule_version(expr.as_deref());
self.partition_expr_version = partition_expr_version(expr.as_deref());
self.partition_expr = expr;
}
@@ -374,7 +374,7 @@ impl RegionMetadata {
schema_version: self.schema_version,
primary_key_encoding: self.primary_key_encoding,
partition_expr: self.partition_expr.clone(),
partition_rule_version: partition_rule_version(self.partition_expr.as_deref()),
partition_expr_version: partition_expr_version(self.partition_expr.as_deref()),
})
}
@@ -683,7 +683,7 @@ impl RegionMetadataBuilder {
fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
let skipped = SkippedFields::new(&self.column_metadatas)?;
let partition_rule_version = partition_rule_version(self.partition_expr.as_deref());
let partition_expr_version = partition_expr_version(self.partition_expr.as_deref());
let meta = RegionMetadata {
schema: skipped.schema,
time_index: skipped.time_index,
@@ -694,7 +694,7 @@ impl RegionMetadataBuilder {
schema_version: self.schema_version,
primary_key_encoding: self.primary_key_encoding,
partition_expr: self.partition_expr,
partition_rule_version,
partition_expr_version,
};
if validate {

View File

@@ -207,7 +207,7 @@ fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequ
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_rule_version: r.partition_rule_version.map(|v| v.value),
partition_expr_version: r.partition_expr_version.map(|v| v.value),
}),
)
})
@@ -228,7 +228,7 @@ fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionR
RegionRequest::Delete(RegionDeleteRequest {
rows,
hint: None,
partition_rule_version: r.partition_rule_version.map(|v| v.value),
partition_expr_version: r.partition_expr_version.map(|v| v.value),
}),
)
})
@@ -404,7 +404,7 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = request.region_id.into();
let partition_rule_version = request.partition_rule_version.map(|v| v.value);
let partition_expr_version = request.partition_expr_version.map(|v| v.value);
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
@@ -424,7 +424,7 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
region_id,
payload,
raw_data: request,
partition_rule_version,
partition_expr_version,
}),
)])
}
@@ -455,8 +455,8 @@ pub struct RegionPutRequest {
pub rows: Rows,
/// Write hint.
pub hint: Option<WriteHint>,
/// Partition rule version for the region.
pub partition_rule_version: Option<u64>,
/// Partition expression version for the region.
pub partition_expr_version: Option<u64>,
}
#[derive(Debug)]
@@ -473,8 +473,8 @@ pub struct RegionDeleteRequest {
pub rows: Rows,
/// Write hint.
pub hint: Option<WriteHint>,
/// Partition rule version for the region.
pub partition_rule_version: Option<u64>,
/// Partition expression version for the region.
pub partition_expr_version: Option<u64>,
}
#[derive(Debug, Clone)]
@@ -1458,7 +1458,7 @@ pub struct RegionBulkInsertsRequest {
pub region_id: RegionId,
pub payload: DfRecordBatch,
pub raw_data: ArrowIpc,
pub partition_rule_version: Option<u64>,
pub partition_expr_version: Option<u64>,
}
impl RegionBulkInsertsRequest {

View File

@@ -24,7 +24,7 @@ mod region_migration;
#[macro_use]
mod repartition;
#[macro_use]
mod repartition_rule_version;
mod repartition_expr_version;
grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
@@ -38,7 +38,7 @@ repartition_tests!(File);
repartition_tests!(S3, S3WithCache, Oss, Azblob, Gcs);
repartition_rule_version_tests!(File);
repartition_expr_version_tests!(File);
repartition_rule_version_tests!(S3, S3WithCache, Oss, Azblob, Gcs);
repartition_expr_version_tests!(S3, S3WithCache, Oss, Azblob, Gcs);
// TODO(niebayes): add integration tests for remote wal.

View File

@@ -30,17 +30,17 @@ use tests_integration::test_util::{StorageType, get_test_store_config};
use tokio::time::{Duration, sleep};
#[macro_export]
macro_rules! repartition_rule_version_tests {
macro_rules! repartition_expr_version_tests {
($($service:ident),*) => {
$(
paste::item! {
mod [<integration_repartition_rule_version_ $service:lower _test>] {
mod [<integration_repartition_expr_version_ $service:lower _test>] {
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_rule_version >]() {
async fn [< test_repartition_expr_version >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition_rule_version::test_repartition_rule_version(store_type).await
$crate::repartition_expr_version::test_repartition_expr_version(store_type).await
}
}
}
@@ -49,12 +49,12 @@ macro_rules! repartition_rule_version_tests {
};
}
pub async fn test_repartition_rule_version(store_type: StorageType) {
let cluster_name = "test_repartition_rule_version";
pub async fn test_repartition_expr_version(store_type: StorageType) {
let cluster_name = "test_repartition_expr_version";
let (store_config, _guard) = get_test_store_config(&store_type);
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_repartition_rule_version_data_home");
let home_dir = create_temp_dir("test_repartition_expr_version_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
@@ -79,7 +79,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
let query_ctx = QueryContext::arc();
let sql = r#"
CREATE TABLE repartition_rule_version_table(
CREATE TABLE repartition_expr_version_table(
`id` INT,
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
@@ -98,7 +98,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
let id = next_id;
next_id += 1;
let sql = format!(
"INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 0.5)"
"INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 0.5)"
);
run_sql(&instance, &sql, query_ctx.clone()).await.unwrap();
success_count += 1;
@@ -110,7 +110,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
run_sql(
&moved_instance,
r#"
ALTER TABLE repartition_rule_version_table SPLIT PARTITION (
ALTER TABLE repartition_expr_version_table SPLIT PARTITION (
`id` < 10
) INTO (
`id` < 5,
@@ -126,7 +126,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
let id = next_id;
next_id += 1;
let sql = format!(
"INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 1.0)"
"INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 1.0)"
);
match run_sql(&instance, &sql, query_ctx.clone()).await {
Ok(_) => success_count += 1,
@@ -142,14 +142,14 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
let id = next_id;
let sql = format!(
"INSERT INTO repartition_rule_version_table VALUES ({id}, '2022-01-01 00:00:00', 2.0)"
"INSERT INTO repartition_expr_version_table VALUES ({id}, '2022-01-01 00:00:00', 2.0)"
);
run_sql(&instance, &sql, query_ctx.clone()).await.unwrap();
success_count += 1;
let result = run_sql(
&instance,
"SELECT count(*) FROM repartition_rule_version_table",
"SELECT count(*) FROM repartition_expr_version_table",
query_ctx.clone(),
)
.await
@@ -160,7 +160,7 @@ pub async fn test_repartition_rule_version(store_type: StorageType) {
run_sql(
&instance,
"DROP TABLE repartition_rule_version_table",
"DROP TABLE repartition_expr_version_table",
query_ctx,
)
.await