mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 02:10:38 +00:00
feat: Store partition expr in RegionMetadata (#6699)
* wire partition.expr_json option constant and parsing test(mito2): manifest roundtrip persists partition_expr JSON test(mito2): create/open with partition.expr_json persists in manifest docs: add comments for partition.expr_json option and RegionOptions.partition_expr serde: include RegionOptions.partition_expr (skip if None) test(mito2): doc intent and verify runtime backfill + persistence-after-alter for partition expr add partition expr to create request Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * add create_with_partition_expr_persists_manifest Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * pass partition expr to create request Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * polish Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix sqlness Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix test Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * remove unused dep Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5324,7 +5324,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69680846a078aae670d93fb30511a72738345199#69680846a078aae670d93fb30511a72738345199"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9edc8c4d2a7e3c7d8e4982a6af94426da4057687#9edc8c4d2a7e3c7d8e4982a6af94426da4057687"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
|
||||
@@ -140,7 +140,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69680846a078aae670d93fb30511a72738345199" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9edc8c4d2a7e3c7d8e4982a6af94426da4057687" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -74,11 +74,19 @@ pub fn make_create_region_request_for_peer(
|
||||
let catalog = &create_table_expr.catalog_name;
|
||||
let schema = &create_table_expr.schema_name;
|
||||
let storage_path = region_storage_path(catalog, schema);
|
||||
let partition_exprs = region_routes
|
||||
.iter()
|
||||
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for region_number in ®ions_on_this_peer {
|
||||
let region_id = RegionId::new(logical_table_id, *region_number);
|
||||
let region_request =
|
||||
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
|
||||
let region_request = request_builder.build_one(
|
||||
region_id,
|
||||
storage_path.clone(),
|
||||
&HashMap::new(),
|
||||
&partition_exprs,
|
||||
);
|
||||
requests.push(region_request);
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,10 @@ impl CreateLogicalTablesProcedure {
|
||||
let table_ids_already_exists = &self.data.table_ids_already_exists;
|
||||
let regions_on_this_peer = find_leader_regions(region_routes, peer);
|
||||
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
|
||||
let partition_exprs = region_routes
|
||||
.iter()
|
||||
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
|
||||
.collect();
|
||||
for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
|
||||
if table_id_already_exists.is_some() {
|
||||
continue;
|
||||
@@ -57,8 +61,12 @@ impl CreateLogicalTablesProcedure {
|
||||
|
||||
for region_number in ®ions_on_this_peer {
|
||||
let region_id = RegionId::new(logical_table_id, *region_number);
|
||||
let one_region_request =
|
||||
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
|
||||
let one_region_request = request_builder.build_one(
|
||||
region_id,
|
||||
storage_path.clone(),
|
||||
&HashMap::new(),
|
||||
&partition_exprs,
|
||||
);
|
||||
requests.push(one_region_request);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,6 +214,11 @@ impl CreateTableProcedure {
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut create_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
let partition_exprs = region_routes
|
||||
.iter()
|
||||
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
|
||||
.collect();
|
||||
|
||||
for datanode in leaders {
|
||||
let requester = self.context.node_manager.datanode(&datanode).await;
|
||||
|
||||
@@ -221,8 +226,12 @@ impl CreateTableProcedure {
|
||||
let mut requests = Vec::with_capacity(regions.len());
|
||||
for region_number in regions {
|
||||
let region_id = RegionId::new(self.table_id(), region_number);
|
||||
let create_region_request =
|
||||
request_builder.build_one(region_id, storage_path.clone(), region_wal_options);
|
||||
let create_region_request = request_builder.build_one(
|
||||
region_id,
|
||||
storage_path.clone(),
|
||||
region_wal_options,
|
||||
&partition_exprs,
|
||||
);
|
||||
requests.push(PbRegionRequest::Create(create_region_request));
|
||||
}
|
||||
|
||||
|
||||
@@ -15,8 +15,10 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::column_def::try_as_column_def;
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{CreateRequest, RegionColumnDef};
|
||||
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
|
||||
use common_telemetry::warn;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
@@ -60,6 +62,7 @@ pub(crate) fn build_template_from_raw_table_info(
|
||||
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
|
||||
path: String::new(),
|
||||
options,
|
||||
partition: None,
|
||||
};
|
||||
|
||||
Ok(template)
|
||||
@@ -121,6 +124,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
|
||||
primary_key,
|
||||
path: String::new(),
|
||||
options: create_table_expr.table_options.clone(),
|
||||
partition: None,
|
||||
};
|
||||
|
||||
Ok(template)
|
||||
@@ -150,6 +154,7 @@ impl CreateRequestBuilder {
|
||||
region_id: RegionId,
|
||||
storage_path: String,
|
||||
region_wal_options: &HashMap<RegionNumber, String>,
|
||||
partition_exprs: &HashMap<RegionNumber, String>,
|
||||
) -> CreateRequest {
|
||||
let mut request = self.template.clone();
|
||||
|
||||
@@ -157,6 +162,7 @@ impl CreateRequestBuilder {
|
||||
request.path = storage_path;
|
||||
// Stores the encoded wal options into the request options.
|
||||
prepare_wal_options(&mut request.options, region_id, region_wal_options);
|
||||
request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
|
||||
|
||||
if let Some(physical_table_id) = self.physical_table_id {
|
||||
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
|
||||
@@ -173,3 +179,55 @@ impl CreateRequestBuilder {
|
||||
request
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_partition_expr(
|
||||
region_id: RegionId,
|
||||
partition_exprs: &HashMap<RegionNumber, String>,
|
||||
) -> Partition {
|
||||
let expr = partition_exprs.get(®ion_id.region_number()).cloned();
|
||||
if expr.is_none() {
|
||||
warn!("region {} has no partition expr", region_id);
|
||||
}
|
||||
|
||||
Partition {
|
||||
expression: expr.unwrap_or_default(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_build_one_sets_partition_expr_per_region() {
|
||||
// minimal template
|
||||
let template = CreateRequest {
|
||||
region_id: 0,
|
||||
engine: "mito".to_string(),
|
||||
column_defs: vec![],
|
||||
primary_key: vec![],
|
||||
path: String::new(),
|
||||
options: Default::default(),
|
||||
partition: None,
|
||||
};
|
||||
let builder = CreateRequestBuilder::new(template, None);
|
||||
|
||||
let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
|
||||
let expr_a =
|
||||
r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
|
||||
partition_exprs.insert(0, expr_a.clone());
|
||||
|
||||
let r0 = builder.build_one(
|
||||
RegionId::new(42, 0),
|
||||
"/p".to_string(),
|
||||
&Default::default(),
|
||||
&partition_exprs,
|
||||
);
|
||||
assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use common_telemetry::info;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::future;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
use table::metadata::RawTableInfo;
|
||||
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path};
|
||||
@@ -112,16 +112,23 @@ impl ReconcileRegions {
|
||||
let create_tables = &ctx.persistent_ctx.create_tables;
|
||||
|
||||
let mut requests = Vec::with_capacity(region_numbers.len() * create_tables.len());
|
||||
|
||||
for (table_id, table_info) in create_tables {
|
||||
let request_builder =
|
||||
create_region_request_from_raw_table_info(table_info, physical_table_id)?;
|
||||
let storage_path =
|
||||
region_storage_path(&table_name.catalog_name, &table_name.schema_name);
|
||||
let partition_exprs = prepare_partition_exprs(ctx, *table_id);
|
||||
|
||||
for region_number in region_numbers {
|
||||
let region_id = RegionId::new(*table_id, *region_number);
|
||||
let one_region_request =
|
||||
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
|
||||
|
||||
let one_region_request = request_builder.build_one(
|
||||
region_id,
|
||||
storage_path.clone(),
|
||||
&HashMap::new(),
|
||||
&partition_exprs,
|
||||
);
|
||||
requests.push(one_region_request);
|
||||
}
|
||||
}
|
||||
@@ -144,3 +151,20 @@ fn create_region_request_from_raw_table_info(
|
||||
let template = build_template_from_raw_table_info(raw_table_info)?;
|
||||
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
|
||||
}
|
||||
|
||||
fn prepare_partition_exprs(
|
||||
ctx: &ReconcileLogicalTablesContext,
|
||||
table_id: TableId,
|
||||
) -> HashMap<RegionNumber, String> {
|
||||
ctx.persistent_ctx
|
||||
.physical_table_route
|
||||
.as_ref()
|
||||
.map(|r| {
|
||||
r.region_routes
|
||||
.iter()
|
||||
.filter(|r| r.region.id.table_id() == table_id)
|
||||
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
|
||||
.collect::<HashMap<_, _>>()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
@@ -124,6 +124,7 @@ mod tests {
|
||||
options: new_test_options(),
|
||||
table_dir: "create_region_dir/".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
let region_id = RegionId::new(1, 0);
|
||||
|
||||
@@ -163,6 +164,7 @@ mod tests {
|
||||
options: new_test_options(),
|
||||
table_dir: region_dir.clone(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
let region_id = RegionId::new(1, 0);
|
||||
|
||||
@@ -203,6 +205,7 @@ mod tests {
|
||||
options: new_test_options(),
|
||||
table_dir: region_dir.clone(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
let region_id = RegionId::new(1, 0);
|
||||
|
||||
|
||||
@@ -158,6 +158,7 @@ fn test_region_request_builder() {
|
||||
primary_key: vec![2, 1],
|
||||
path: String::new(),
|
||||
options: HashMap::new(),
|
||||
partition: None,
|
||||
};
|
||||
assert_eq!(template.template(), &expected);
|
||||
}
|
||||
|
||||
@@ -486,6 +486,7 @@ impl MetricEngineInner {
|
||||
options,
|
||||
table_dir: request.table_dir.clone(),
|
||||
path_type: PathType::Metadata,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -653,6 +654,7 @@ mod test {
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
primary_key: vec![],
|
||||
options: HashMap::new(),
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
let result = MetricEngineInner::verify_region_create_request(&request);
|
||||
assert!(result.is_err());
|
||||
@@ -699,6 +701,7 @@ mod test {
|
||||
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
MetricEngineInner::verify_region_create_request(&request).unwrap();
|
||||
}
|
||||
@@ -731,6 +734,7 @@ mod test {
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
primary_key: vec![],
|
||||
options: HashMap::new(),
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
MetricEngineInner::verify_region_create_request(&request).unwrap_err();
|
||||
|
||||
@@ -783,6 +787,7 @@ mod test {
|
||||
options,
|
||||
table_dir: "/test_dir".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
|
||||
// set up
|
||||
|
||||
@@ -96,6 +96,7 @@ mod tests {
|
||||
options: HashMap::new(),
|
||||
table_dir: "test".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -116,6 +117,7 @@ mod tests {
|
||||
options: HashMap::new(),
|
||||
table_dir: "test".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
},
|
||||
),
|
||||
];
|
||||
@@ -157,6 +159,7 @@ mod tests {
|
||||
options: HashMap::new(),
|
||||
table_dir: "test".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
},
|
||||
)];
|
||||
|
||||
|
||||
@@ -152,6 +152,7 @@ impl TestEnv {
|
||||
.collect(),
|
||||
table_dir: table_dir.to_string(),
|
||||
path_type: PathType::Bare, // Use Bare path type for engine regions
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
|
||||
// create physical region
|
||||
@@ -336,6 +337,7 @@ pub fn create_logical_region_request(
|
||||
.collect(),
|
||||
table_dir: table_dir.to_string(),
|
||||
path_type: PathType::Bare, // Use Bare path type for engine regions
|
||||
partition_expr_json: Some("".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -657,6 +657,7 @@ async fn test_cache_null_primary_key() {
|
||||
options: HashMap::new(),
|
||||
table_dir: "test".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
|
||||
@@ -22,7 +22,9 @@ use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::region::options::MemtableOptions;
|
||||
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
|
||||
use crate::test_util::{
|
||||
build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_new_region() {
|
||||
@@ -243,3 +245,31 @@ async fn test_engine_create_with_memtable_opts() {
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_with_partition_expr_persists_manifest() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let expr_json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
|
||||
let request = CreateRequestBuilder::new()
|
||||
.partition_expr_json(Some(expr_json.to_string()))
|
||||
.build();
|
||||
let table_dir = request.table_dir.clone();
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
|
||||
|
||||
// Reopen the region with options containing partition.expr_json
|
||||
reopen_region(&engine, region_id, table_dir, false, Default::default()).await;
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
|
||||
}
|
||||
|
||||
@@ -641,6 +641,36 @@ mod test {
|
||||
manager.validate_manifest(&metadata, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn manifest_with_partition_expr_roundtrip() {
|
||||
let env = TestEnv::new().await;
|
||||
let expr_json =
|
||||
r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
|
||||
let mut metadata = basic_region_metadata();
|
||||
metadata.partition_expr = Some(expr_json.to_string());
|
||||
let metadata = Arc::new(metadata);
|
||||
let mut manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// persisted manifest should contain the same partition_expr JSON
|
||||
let manifest = manager.manifest();
|
||||
assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
|
||||
|
||||
manager.stop().await;
|
||||
|
||||
// Reopen and check again
|
||||
let manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let manifest = manager.manifest();
|
||||
assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn region_change_add_column() {
|
||||
let metadata = Arc::new(basic_region_metadata());
|
||||
@@ -783,6 +813,6 @@ mod test {
|
||||
|
||||
// get manifest size again
|
||||
let manifest_size = manager.manifest_usage();
|
||||
assert_eq!(manifest_size, 1204);
|
||||
assert_eq!(manifest_size, 1226);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,7 +179,7 @@ async fn manager_with_checkpoint_distance_1() {
|
||||
.unwrap();
|
||||
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
|
||||
let expected_json =
|
||||
"{\"size\":879,\"version\":10,\"checksum\":2245967096,\"extend_metadata\":{}}";
|
||||
"{\"size\":901,\"version\":10,\"checksum\":2571452538,\"extend_metadata\":{}}";
|
||||
assert_eq!(expected_json, raw_json);
|
||||
|
||||
// reopen the manager
|
||||
|
||||
@@ -646,6 +646,7 @@ pub struct CreateRequestBuilder {
|
||||
ts_type: ConcreteDataType,
|
||||
/// kafka topic name
|
||||
kafka_topic: Option<String>,
|
||||
partition_expr_json: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for CreateRequestBuilder {
|
||||
@@ -660,6 +661,7 @@ impl Default for CreateRequestBuilder {
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
ts_type: ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
kafka_topic: None,
|
||||
partition_expr_json: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -718,6 +720,12 @@ impl CreateRequestBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn partition_expr_json(mut self, partition_expr_json: Option<String>) -> Self {
|
||||
self.partition_expr_json = partition_expr_json;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(&self) -> RegionCreateRequest {
|
||||
let mut column_id = 0;
|
||||
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
|
||||
@@ -775,6 +783,7 @@ impl CreateRequestBuilder {
|
||||
options,
|
||||
table_dir: self.table_dir.clone(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: self.partition_expr_json.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
builder.push_column_metadata(column);
|
||||
}
|
||||
builder.primary_key(request.primary_key);
|
||||
if let Some(expr_json) = request.partition_expr_json.as_ref() {
|
||||
builder.partition_expr_json(Some(expr_json.clone()));
|
||||
}
|
||||
|
||||
// Create a MitoRegion from the RegionMetadata.
|
||||
let region = RegionOpener::new(
|
||||
|
||||
@@ -153,6 +153,12 @@ pub struct RegionMetadata {
|
||||
|
||||
/// Primary key encoding mode.
|
||||
pub primary_key_encoding: PrimaryKeyEncoding,
|
||||
|
||||
/// Partition expression serialized as a JSON string.
|
||||
/// Compatibility behavior:
|
||||
/// - None: no partition expr was ever set in the manifest (legacy regions).
|
||||
/// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
|
||||
pub partition_expr: Option<String>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RegionMetadata {
|
||||
@@ -163,6 +169,7 @@ impl fmt::Debug for RegionMetadata {
|
||||
.field("primary_key", &self.primary_key)
|
||||
.field("region_id", &self.region_id)
|
||||
.field("schema_version", &self.schema_version)
|
||||
.field("partition_expr", &self.partition_expr)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -183,6 +190,8 @@ impl<'de> Deserialize<'de> for RegionMetadata {
|
||||
schema_version: u64,
|
||||
#[serde(default)]
|
||||
primary_key_encoding: PrimaryKeyEncoding,
|
||||
#[serde(default)]
|
||||
partition_expr: Option<String>,
|
||||
}
|
||||
|
||||
let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
|
||||
@@ -198,6 +207,7 @@ impl<'de> Deserialize<'de> for RegionMetadata {
|
||||
region_id: without_schema.region_id,
|
||||
schema_version: without_schema.schema_version,
|
||||
primary_key_encoding: without_schema.primary_key_encoding,
|
||||
partition_expr: without_schema.partition_expr,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -350,6 +360,7 @@ impl RegionMetadata {
|
||||
region_id: self.region_id,
|
||||
schema_version: self.schema_version,
|
||||
primary_key_encoding: self.primary_key_encoding,
|
||||
partition_expr: self.partition_expr.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -524,6 +535,7 @@ pub struct RegionMetadataBuilder {
|
||||
primary_key: Vec<ColumnId>,
|
||||
schema_version: u64,
|
||||
primary_key_encoding: PrimaryKeyEncoding,
|
||||
partition_expr: Option<String>,
|
||||
}
|
||||
|
||||
impl RegionMetadataBuilder {
|
||||
@@ -535,6 +547,7 @@ impl RegionMetadataBuilder {
|
||||
primary_key: vec![],
|
||||
schema_version: 0,
|
||||
primary_key_encoding: PrimaryKeyEncoding::Dense,
|
||||
partition_expr: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -546,6 +559,7 @@ impl RegionMetadataBuilder {
|
||||
region_id: existing.region_id,
|
||||
schema_version: existing.schema_version,
|
||||
primary_key_encoding: existing.primary_key_encoding,
|
||||
partition_expr: existing.partition_expr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,6 +569,12 @@ impl RegionMetadataBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the partition expression in JSON string form.
|
||||
pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
|
||||
self.partition_expr = expr_json;
|
||||
self
|
||||
}
|
||||
|
||||
/// Pushes a new column metadata to this region's metadata.
|
||||
pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
|
||||
self.column_metadatas.push(column_metadata);
|
||||
@@ -623,6 +643,7 @@ impl RegionMetadataBuilder {
|
||||
region_id: self.region_id,
|
||||
schema_version: self.schema_version,
|
||||
primary_key_encoding: self.primary_key_encoding,
|
||||
partition_expr: self.partition_expr,
|
||||
};
|
||||
|
||||
meta.validate()?;
|
||||
@@ -1258,7 +1279,8 @@ mod test {
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
.primary_key(vec![1])
|
||||
.partition_expr_json(Some("".to_string()));
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
@@ -1901,7 +1923,7 @@ mod test {
|
||||
fn test_debug_for_column_metadata() {
|
||||
let region_metadata = build_test_region_metadata();
|
||||
let formatted = format!("{:?}", region_metadata);
|
||||
assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
|
||||
assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -229,6 +229,7 @@ fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateR
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let region_id = RegionId::from(create.region_id);
|
||||
let table_dir = table_dir(&create.path, region_id.table_id());
|
||||
let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
|
||||
Ok((
|
||||
region_id,
|
||||
RegionCreateRequest {
|
||||
@@ -238,6 +239,7 @@ fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateR
|
||||
options: create.options,
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -426,6 +428,9 @@ pub struct RegionCreateRequest {
|
||||
pub table_dir: String,
|
||||
/// Path type for generating paths
|
||||
pub path_type: PathType,
|
||||
/// Partition expression JSON from table metadata. Set to empty string for a region without partition.
|
||||
/// `Option` to keep compatibility with old clients.
|
||||
pub partition_expr_json: Option<String>,
|
||||
}
|
||||
|
||||
impl RegionCreateRequest {
|
||||
@@ -1853,6 +1858,7 @@ mod tests {
|
||||
options: HashMap::new(),
|
||||
table_dir: "path".to_string(),
|
||||
path_type: PathType::Bare,
|
||||
partition_expr_json: Some("".to_string()),
|
||||
};
|
||||
|
||||
assert!(create.validate().is_err());
|
||||
|
||||
@@ -29,7 +29,7 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
|
||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
|
||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||
| 3 | 2238 | 0 | 0 |
|
||||
| 3 | 2699 | 0 | 0 |
|
||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||
|
||||
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';
|
||||
|
||||
Reference in New Issue
Block a user