fix(repartition): improve physical region allocation and compaction read path correctness (#7621)

* fix: fix metadata region

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: adjust repartition flow and compaction read compatibility

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename compaction mapper and pk projection

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename `CompactionProjectionMapper`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: clarify compaction projection naming

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fmt

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: allow create physical table with internal columns

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix template logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update sqlness result

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-28 12:04:05 +08:00
committed by GitHub
parent 238bc4fa2c
commit 5bfc728d32
19 changed files with 733 additions and 208 deletions

View File

@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, false)?;
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -20,55 +20,51 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column,
};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::reconciliation::utils::build_column_metadata_from_table_info;
use crate::wal_provider::prepare_wal_options;
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
skip_internal_columns: bool,
) -> Result<CreateRequest> {
/// Note: This function is primarily intended for creating logical tables.
///
/// Logical table templates keep the original column order and primary key indices from
/// `RawTableInfo` (including internal columns when present), because these are used to
/// reconstruct the logical schema on the engine side.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let filtered = raw_table_info
let column_defs = raw_table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
.filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name))
.map(|(i, c)| {
let is_primary_key = primary_key_indices.contains(&i);
let column_def = try_as_column_def(c, is_primary_key)
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
Ok((
is_primary_key.then_some(i),
RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
},
))
Ok(RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
})
})
.collect::<Result<Vec<(Option<usize>, RegionColumnDef)>>>()?;
.collect::<Result<Vec<_>>>()?;
let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip();
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: new_primary_key_indices
primary_key: raw_table_info
.meta
.primary_key_indices
.iter()
.flatten()
.map(|i| *i as u32)
.collect(),
path: String::new(),
@@ -79,6 +75,60 @@ pub fn build_template_from_raw_table_info(
Ok(template)
}
/// Constructs a [CreateRequest] based on the provided [RawTableInfo] for physical table.
///
/// Note: This function is primarily intended for creating physical table.
///
/// Physical table templates mark primary
/// keys by tag semantic type to match the physical storage layout.
pub fn build_template_from_raw_table_info_for_physical_table(
raw_table_info: &RawTableInfo,
) -> Result<CreateRequest> {
let name_to_ids = raw_table_info
.name_to_ids()
.context(error::MissingColumnIdsSnafu)?;
let column_metadatas = build_column_metadata_from_table_info(
&raw_table_info.meta.schema.column_schemas,
&raw_table_info.meta.primary_key_indices,
&name_to_ids,
)?;
let primary_key_ids = column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| c.column_id)
.collect::<Vec<_>>();
let column_defs = column_metadatas
.iter()
.map(|c| {
let column_def =
try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context(
error::ConvertColumnDefSnafu {
column: &c.column_schema.name,
},
)?;
let region_column_def = RegionColumnDef {
column_def: Some(column_def),
column_id: c.column_id,
};
Ok(region_column_def)
})
.collect::<Result<Vec<_>>>()?;
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_ids,
path: String::new(),
options,
partition: None,
};
Ok(template)
}
pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
let column_defs = create_table_expr
.column_defs

View File

@@ -467,7 +467,10 @@ pub fn extract_column_metadatas(
ensure!(
column_metadata == first_column_metadatas,
MetadataCorruptionSnafu {
err_msg: "The table column metadata schemas from datanodes are not the same."
err_msg: format!(
"The table column metadata schemas from datanodes are not the same. First: {:?}, Current: {:?}",
first_column_metadatas, column_metadata,
),
}
);
}

View File

@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, false)?;
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -71,6 +71,30 @@ impl RecordBatch {
})
}
pub fn to_df_record_batch<I: IntoIterator<Item = VectorRef>>(
arrow_schema: ArrowSchemaRef,
columns: I,
) -> Result<DfRecordBatch> {
let columns: Vec<_> = columns.into_iter().collect();
let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
// Casting the arrays here to match the schema, is a temporary solution to support Arrow's
// view array types (`StringViewArray` and `BinaryViewArray`).
// As to "support": the arrays here are created from vectors, which do not have types
// corresponding to view arrays. What we can do is to only cast them.
// As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
// the casting here will be removed in the end.
// TODO(LFC): Remove the casting here once `Batch` is no longer used.
let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?;
let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?;
let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays)
.context(error::NewDfRecordBatchSnafu)?;
Ok(df_record_batch)
}
fn cast_view_arrays(
schema: &ArrowSchemaRef,
mut arrays: Vec<ArrayRef>,

View File

@@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet};
use common_meta::ddl::create_table::executor::CreateTableExecutor;
use common_meta::ddl::create_table::template::{
CreateRequestBuilder, build_template_from_raw_table_info,
CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
};
use common_meta::lock_key::TableLock;
use common_meta::node_manager::NodeManagerRef;
@@ -268,8 +268,15 @@ impl AllocateRegion {
&raw_table_info.name,
);
let table_id = raw_table_info.ident.table_id;
let request = build_template_from_raw_table_info(raw_table_info, true)
// Repartition allocation targets physical regions, so exclude metric internal columns
// and derive primary keys from tag semantics.
let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
.context(error::BuildCreateRequestSnafu { table_id })?;
common_telemetry::debug!(
"Allocating regions request, table_id: {}, request: {:?}",
table_id,
request
);
let builder = CreateRequestBuilder::new(request, None);
let region_count = region_routes.len();
let wal_region_count = wal_options.len();

View File

@@ -31,6 +31,7 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
is_metric_engine_internal_column,
};
use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
use store_api::region_engine::RegionEngine;
@@ -361,15 +362,18 @@ impl MetricEngineInner {
.map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
.collect::<HashMap<String, usize>>();
// check if internal columns are not occupied
let table_id_col_def = request.column_metadatas.iter().any(is_metric_name_col);
let tsid_col_def = request.column_metadatas.iter().any(is_tsid_col);
// check if internal columns are not occupied or defined in the request
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
!name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) || table_id_col_def,
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
}
);
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
!name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME) || tsid_col_def,
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_TSID_COLUMN_NAME,
}
@@ -389,6 +393,10 @@ impl MetricEngineInner {
// check if only one field column is declared, and all tag columns are string
let mut field_col: Option<&ColumnMetadata> = None;
for col in &request.column_metadatas {
// Verified in above steps.
if is_metric_engine_internal_column(&col.column_schema.name) {
continue;
}
match col.semantic_type {
SemanticType::Tag => ensure!(
col.column_schema.data_type == ConcreteDataType::string_datatype(),
@@ -508,21 +516,30 @@ impl MetricEngineInner {
data_region_request.table_dir = request.table_dir.clone();
data_region_request.path_type = PathType::Data;
let table_id_col_def = request.column_metadatas.iter().any(is_metric_name_col);
let tsid_col_def = request.column_metadatas.iter().any(is_tsid_col);
// change nullability for tag columns
data_region_request
.column_metadatas
.iter_mut()
.for_each(|metadata| {
if metadata.semantic_type == SemanticType::Tag {
if metadata.semantic_type == SemanticType::Tag
&& !is_metric_name_col(metadata)
&& !is_tsid_col(metadata)
{
metadata.column_schema.set_nullable();
primary_key.push(metadata.column_id);
}
});
// add internal columns
let [table_id_col, tsid_col] = Self::internal_column_metadata();
data_region_request.column_metadatas.push(table_id_col);
data_region_request.column_metadatas.push(tsid_col);
// add internal columns if not defined in the request
if !table_id_col_def {
data_region_request.column_metadatas.push(table_id_col());
}
if !tsid_col_def {
data_region_request.column_metadatas.push(tsid_col());
}
data_region_request.primary_key = primary_key;
// set data region options
@@ -533,41 +550,57 @@ impl MetricEngineInner {
data_region_request
}
}
/// Generate internal column metadata.
///
/// Return `[table_id_col, tsid_col]`
fn internal_column_metadata() -> [ColumnMetadata; 2] {
// Safety: BloomFilter is a valid skipping index type
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::table_id(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
)
.with_skipping_options(SkippingIndexOptions::new_unchecked(
DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
datatypes::schema::SkippingIndexType::BloomFilter,
))
.unwrap(),
};
let tsid_col = ColumnMetadata {
column_id: ReservedColumnId::tsid(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::uint64_datatype(),
false,
)
.with_inverted_index(false),
};
[metric_name_col, tsid_col]
fn table_id_col() -> ColumnMetadata {
ColumnMetadata {
column_id: ReservedColumnId::table_id(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
)
.with_skipping_options(SkippingIndexOptions::new_unchecked(
DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
datatypes::schema::SkippingIndexType::BloomFilter,
))
.unwrap(),
}
}
fn tsid_col() -> ColumnMetadata {
ColumnMetadata {
column_id: ReservedColumnId::tsid(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::uint64_datatype(),
false,
)
.with_inverted_index(false),
}
}
/// Returns true if the column is the metric name column.
pub(crate) fn is_metric_name_col(column: &ColumnMetadata) -> bool {
column.column_id == ReservedColumnId::table_id()
&& column.semantic_type == SemanticType::Tag
&& column.column_schema.data_type == ConcreteDataType::uint32_datatype()
&& column.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
&& !column.column_schema.is_nullable()
}
/// Returns true if the column is the tsid column.
pub(crate) fn is_tsid_col(column: &ColumnMetadata) -> bool {
column.column_id == ReservedColumnId::tsid()
&& column.semantic_type == SemanticType::Tag
&& column.column_schema.data_type == ConcreteDataType::uint64_datatype()
&& column.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME
&& !column.column_schema.is_nullable()
}
/// Groups the create logical region requests by physical region id.
fn group_create_logical_region_requests_by_physical_region_id(
requests: Vec<(RegionId, RegionCreateRequest)>,
@@ -628,6 +661,14 @@ mod test {
use crate::engine::MetricEngine;
use crate::test_util::{TestEnv, create_logical_region_request};
#[test]
fn test_internal_column_metadata() {
let table_id_col = table_id_col();
let tsid_col = tsid_col();
assert!(is_metric_name_col(&table_id_col));
assert!(is_tsid_col(&tsid_col));
}
#[test]
fn test_verify_region_create_request() {
// internal column is occupied
@@ -666,6 +707,50 @@ mod test {
"Internal column __table_id is reserved".to_string()
);
// allow reserved internal columns when defined properly
let request = RegionCreateRequest {
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"column1".to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
ColumnMetadata {
column_id: 2,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"column2".to_string(),
ConcreteDataType::float64_datatype(),
false,
),
},
table_id_col(),
tsid_col(),
],
table_dir: "test_dir".to_string(),
path_type: PathType::Bare,
engine: METRIC_ENGINE_NAME.to_string(),
primary_key: vec![],
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect(),
partition_expr_json: Some("".to_string()),
};
MetricEngineInner::verify_region_create_request(&request).unwrap();
// valid request
let request = RegionCreateRequest {
column_metadatas: vec![
@@ -820,6 +905,100 @@ mod test {
assert!(!metadata_region_request.options.contains_key("skip_wal"));
}
#[tokio::test]
async fn test_create_request_for_physical_regions_with_internal_columns() {
let options: HashMap<_, _> = [
("ttl".to_string(), "60m".to_string()),
("skip_wal".to_string(), "true".to_string()),
]
.into_iter()
.collect();
let request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag",
ConcreteDataType::string_datatype(),
false,
),
},
ColumnMetadata {
column_id: 2,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"value",
ConcreteDataType::float64_datatype(),
false,
),
},
table_id_col(),
tsid_col(),
],
primary_key: vec![0],
options,
table_dir: "/test_dir".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
};
let env = TestEnv::new().await;
let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
let engine_inner = engine.inner;
let data_region_request = engine_inner.create_request_for_data_region(&request);
assert_eq!(data_region_request.column_metadatas.len(), 5);
assert_eq!(
data_region_request.primary_key,
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
);
let table_id_count = data_region_request
.column_metadatas
.iter()
.filter(|metadata| metadata.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.count();
let tsid_count = data_region_request
.column_metadatas
.iter()
.filter(|metadata| metadata.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME)
.count();
assert_eq!(table_id_count, 1);
assert_eq!(tsid_count, 1);
let tag_metadata = data_region_request
.column_metadatas
.iter()
.find(|metadata| metadata.column_schema.name == "tag")
.unwrap();
assert!(tag_metadata.column_schema.is_nullable());
let table_id_metadata = data_region_request
.column_metadatas
.iter()
.find(|metadata| metadata.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap();
assert!(is_metric_name_col(table_id_metadata));
let tsid_metadata = data_region_request
.column_metadatas
.iter()
.find(|metadata| metadata.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME)
.unwrap();
assert!(is_tsid_col(tsid_metadata));
}
#[tokio::test]
async fn test_create_logical_regions() {
let env = TestEnv::new().await;

View File

@@ -15,12 +15,11 @@
use common_base::AffectedRows;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use crate::engine::MetricEngine;
use crate::error::{MitoEnterStagingOperationSnafu, Result};
use crate::utils;
impl MetricEngine {
/// Handles the enter staging request for the given region.
@@ -29,24 +28,11 @@ impl MetricEngine {
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
// We don't need to enter staging for metadata region.
// Callers should pass the data region id here; metadata regions stay unchanged.
self.inner
.mito
.handle_request(
metadata_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: String::new(),
}),
)
.await
.context(MitoEnterStagingOperationSnafu)?;
self.inner
.mito
.handle_request(data_region_id, request)
.handle_request(region_id, request)
.await
.context(MitoEnterStagingOperationSnafu)
.map(|response| response.affected_rows)

View File

@@ -97,6 +97,7 @@ impl BulkIterContext {
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
compaction_projection_mapper: None,
pre_filter_mode,
partition_filter: None,
},

View File

@@ -18,8 +18,8 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatch;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::datatypes::Field;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
@@ -28,7 +28,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::read_column_ids_from_projection;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
@@ -252,7 +252,15 @@ impl FlatProjectionMapper {
if self.is_empty_projection {
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
}
let columns = self.project_vectors(batch)?;
RecordBatch::new(self.output_schema.clone(), columns)
}
/// Projects columns from the input batch and converts them into vectors.
pub(crate) fn project_vectors(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
) -> common_recordbatch::error::Result<Vec<datatypes::vectors::VectorRef>> {
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
for index in &self.batch_indices {
let mut array = batch.column(*index).clone();
@@ -269,8 +277,7 @@ impl FlatProjectionMapper {
.context(ExternalSnafu)?;
columns.push(vector);
}
RecordBatch::new(self.output_schema.clone(), columns)
Ok(columns)
}
}
@@ -341,3 +348,99 @@ fn compute_input_arrow_schema(
Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
}
/// Helper to project compaction batches into flat format columns
/// (fields + time index + __primary_key + __sequence + __op_type).
pub(crate) struct CompactionProjectionMapper {
mapper: FlatProjectionMapper,
assembler: DfBatchAssembler,
}
impl CompactionProjectionMapper {
pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result<Self> {
let projection = metadata
.column_metadatas
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if matches!(col.semantic_type, SemanticType::Field) {
Some(idx)
} else {
None
}
})
.chain([metadata.time_index_column_pos()])
.collect::<Vec<_>>();
let mapper = FlatProjectionMapper::new_with_read_columns(
metadata,
projection,
metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect(),
)?;
let assembler = DfBatchAssembler::new(mapper.output_schema());
Ok(Self { mapper, assembler })
}
/// Projects columns and appends internal columns for compaction output.
///
/// The input batch is expected to be in flat format with internal columns appended.
pub(crate) fn project(&self, batch: DfRecordBatch) -> Result<DfRecordBatch> {
let columns = self
.mapper
.project_vectors(&batch)
.context(RecordBatchSnafu)?;
self.assembler
.build_df_record_batch_with_internal(&batch, columns)
.context(RecordBatchSnafu)
}
}
/// Builds [DfRecordBatch] with internal columns appended.
pub(crate) struct DfBatchAssembler {
output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef,
}
impl DfBatchAssembler {
/// Precomputes the output schema with internal columns.
pub(crate) fn new(output_schema: SchemaRef) -> Self {
let fields = output_schema
.arrow_schema()
.fields()
.into_iter()
.chain(internal_fields().iter())
.cloned()
.collect::<Vec<_>>();
let output_arrow_schema_with_internal =
Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
Self {
output_arrow_schema_with_internal,
}
}
/// Builds a [DfRecordBatch] from projected vectors plus internal columns.
///
/// Assumes the input batch already contains internal columns as the last three fields
/// ("__primary_key", "__sequence", "__op_type").
pub(crate) fn build_df_record_batch_with_internal(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
mut columns: Vec<datatypes::vectors::VectorRef>,
) -> common_recordbatch::error::Result<DfRecordBatch> {
let num_columns = batch.columns().len();
// The last 3 columns are the internal columns.
let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1];
for index in internal_indices.iter() {
let array = batch.column(*index).clone();
let vector = Helper::try_into_vector(array)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
columns.push(vector);
}
RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
}
}

View File

@@ -1443,7 +1443,16 @@ pub fn build_flat_file_range_scan_stream(
})
})
.transpose()?;
let mapper = range.compaction_projection_mapper();
while let Some(record_batch) = reader.next_batch()? {
let record_batch = if let Some(mapper) = mapper {
let batch = mapper.project(record_batch)?;
batch
} else {
record_batch
};
if let Some(flat_compat) = may_compat {
let batch = flat_compat.compat(record_batch)?;
yield batch;

View File

@@ -186,7 +186,7 @@ impl MitoRegion {
}
/// Returns current metadata of the region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
pub fn metadata(&self) -> RegionMetadataRef {
let version_data = self.version_control.current();
version_data.version.metadata.clone()
}

View File

@@ -44,6 +44,7 @@ use crate::error::{
};
use crate::read::Batch;
use crate::read::compat::CompatBatch;
use crate::read::flat_projection::CompactionProjectionMapper;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::{FlatPruneReader, PruneReader};
use crate::sst::file::FileHandle;
@@ -269,6 +270,11 @@ impl FileRange {
self.context.compat_batch()
}
/// Returns the helper to project batches.
pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
self.context.compaction_projection_mapper()
}
/// Returns the file handle of the file range.
pub(crate) fn file_handle(&self) -> &FileHandle {
self.context.reader_builder.file_handle()
@@ -324,6 +330,11 @@ impl FileRangeContext {
self.base.compat_batch.as_ref()
}
/// Returns the helper to project batches.
pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
self.base.compaction_projection_mapper.as_ref()
}
/// Sets the `CompatBatch` to the context.
pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
self.base.compat_batch = compat;
@@ -406,6 +417,8 @@ pub(crate) struct RangeBase {
pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
/// Optional helper to compat batches.
pub(crate) compat_batch: Option<CompatBatch>,
/// Optional helper to project batches.
pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
/// Mode to pre-filter columns.
pub(crate) pre_filter_mode: PreFilterMode,
/// Partition filter.
@@ -628,10 +641,12 @@ impl RangeBase {
continue;
}
// Get the column directly by its projected index
// Get the column directly by its projected index.
// If the column is missing and it's not a tag/time column, this filter is skipped.
// Assumes the projection indices align with the input batch schema.
let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
if let Some(idx) = column_idx {
let column = &input.columns()[idx];
let column = &input.columns().get(idx).unwrap();
let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
} else if filter_ctx.semantic_type() == SemanticType::Tag {

View File

@@ -257,6 +257,9 @@ impl FlatReadFormat {
}
/// Gets the projection in the flat format.
///
/// When `skip_auto_convert` is enabled (primary-key format read), this returns the
/// primary-key format projection so filter/prune can resolve projected indices.
pub(crate) fn format_projection(&self) -> &FormatProjection {
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.format_projection,
@@ -393,20 +396,29 @@ impl ParquetPrimaryKeyToFlat {
let id_to_index = sst_column_id_indices(&metadata);
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
// Computes the format projection for the new format.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
sst_column_num,
column_ids.iter().copied(),
);
let codec = build_primary_key_codec(&metadata);
let convert_format = if skip_auto_convert {
None
} else {
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
};
let codec = build_primary_key_codec(&metadata);
let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
let (convert_format, format_projection) = if skip_auto_convert {
(
None,
FormatProjection {
projection_indices: format.projection_indices().to_vec(),
column_id_to_projected_index: format.field_id_to_projected_index().clone(),
},
)
} else {
// Computes the format projection for the new format.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
sst_column_num,
column_ids.iter().copied(),
);
(
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
format_projection,
)
};
Self {
format,

View File

@@ -488,6 +488,11 @@ impl PrimaryKeyReadFormat {
&self.projection_indices
}
/// Gets the field id to projected index.
pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
&self.field_id_to_projected_index
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
self.override_sequence

View File

@@ -36,7 +36,9 @@ use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId};
@@ -54,6 +56,7 @@ use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::flat_projection::CompactionProjectionMapper;
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
@@ -336,6 +339,37 @@ impl ParquetReaderBuilder {
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST.
let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
let region_partition_expr = self
.expected_metadata
.as_ref()
.and_then(|meta| meta.partition_expr.as_ref());
let (_, is_same_region_partition) = Self::is_same_region_partition(
region_partition_expr.as_ref().map(|expr| expr.as_str()),
self.file_handle.meta_ref().partition_expr.as_ref(),
)?;
// Skip auto convert when:
// - compaction is enabled
// - region partition expr is same with file partition expr (no need to auto convert)
let skip_auto_convert = self.compaction && is_same_region_partition;
// Build a compaction projection helper when:
// - compaction is enabled
// - region partition expr differs from file partition expr
// - flat format is enabled
// - primary key encoding is sparse
//
// This is applied after row-group filtering to align batches with flat output schema
// before compat handling.
let compaction_projection_mapper = if self.compaction
&& !is_same_region_partition
&& self.flat_format
&& region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
{
Some(CompactionProjectionMapper::try_new(&region_meta)?)
} else {
None
};
let mut read_format = if let Some(column_ids) = &self.projection {
ReadFormat::new(
region_meta.clone(),
@@ -343,7 +377,7 @@ impl ParquetReaderBuilder {
self.flat_format,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
self.compaction,
skip_auto_convert,
)?
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
@@ -359,7 +393,7 @@ impl ParquetReaderBuilder {
self.flat_format,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
self.compaction,
skip_auto_convert,
)?
};
if self.decode_primary_key_values {
@@ -454,6 +488,7 @@ impl ParquetReaderBuilder {
prune_schema,
codec,
compat_batch: None,
compaction_projection_mapper,
pre_filter_mode: self.pre_filter_mode,
partition_filter,
},
@@ -464,6 +499,19 @@ impl ParquetReaderBuilder {
Ok((context, selection))
}
fn is_same_region_partition(
region_partition_expr_str: Option<&str>,
file_partition_expr: Option<&PartitionExpr>,
) -> Result<(Option<PartitionExpr>, bool)> {
let region_partition_expr = match region_partition_expr_str {
Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
None => None,
};
let is_same = region_partition_expr.as_ref() == file_partition_expr;
Ok((region_partition_expr, is_same))
}
/// Compare partition expressions from expected metadata and file metadata,
/// and build a partition filter if they differ.
fn build_partition_filter(
@@ -477,19 +525,19 @@ impl ParquetReaderBuilder {
.and_then(|meta| meta.partition_expr.as_ref());
let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
let Some(region_str) = region_partition_expr_str else {
return Ok(None);
};
let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
region_partition_expr_str.map(|s| s.as_str()),
file_partition_expr_ref,
)?;
let Some(region_partition_expr) = crate::region::parse_partition_expr(Some(region_str))?
else {
return Ok(None);
};
if Some(&region_partition_expr) == file_partition_expr_ref {
if is_same_region_partition {
return Ok(None);
}
let Some(region_partition_expr) = region_partition_expr else {
return Ok(None);
};
// Collect columns referenced by the partition expression.
let mut referenced_columns = HashSet::new();
region_partition_expr.collect_column_names(&mut referenced_columns);

View File

@@ -82,6 +82,7 @@ sqlx = { workspace = true, features = [
"chrono",
] }
standalone.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tempfile.workspace = true

View File

@@ -60,12 +60,14 @@ use meta_srv::gc::GcSchedulerOptions;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use mito2::gc::GcConfig;
use mito2::region::MitoRegionRef;
use object_store::config::ObjectStoreConfig;
use rand::Rng;
use servers::grpc::GrpcOptions;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::server::ServerHandlers;
use store_api::storage::RegionId;
use tempfile::TempDir;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
@@ -138,6 +140,20 @@ impl GreptimeDbCluster {
sst_files
}
pub async fn list_all_regions(&self) -> HashMap<RegionId, MitoRegionRef> {
let mut regions = HashMap::new();
for datanode in self.datanode_instances.values() {
let region_server = datanode.region_server();
let mito = region_server.mito_engine().unwrap();
for region in mito.regions() {
regions.insert(region.region_id(), region);
}
}
regions
}
}
pub struct GreptimeDbClusterBuilder {

View File

@@ -30,6 +30,8 @@ use meta_srv::metasrv::Metasrv;
use mito2::gc::GcConfig;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::RegionId;
use tests_integration::cluster::GreptimeDbClusterBuilder;
use tests_integration::test_util::{StorageType, get_test_store_config};
use tokio::sync::oneshot;
@@ -45,7 +47,11 @@ macro_rules! repartition_tests {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition::test_repartition_mito(store_type).await
// Cover both storage formats for repartition behavior.
// for flat format
$crate::repartition::test_repartition_mito(store_type, true).await;
// for primary key format
$crate::repartition::test_repartition_mito(store_type, false).await;
}
}
@@ -53,8 +59,17 @@ macro_rules! repartition_tests {
async fn [< test_repartition_metric >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
use store_api::codec::PrimaryKeyEncoding;
common_telemetry::init_default_ut_logging();
$crate::repartition::test_repartition_metric(store_type).await
// Exercise format + primary key encoding matrix for metric engine.
// for flat format with sparse primary key encoding
$crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Sparse).await;
// for flat format with dense primary key encoding
$crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Dense).await;
// for primary key format with sparse primary key encoding
$crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Sparse).await;
// for primary key format with dense primary key encoding
$crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Dense).await;
}
}
}
@@ -114,7 +129,25 @@ async fn trigger_full_gc(ticker: &GcTickerRef) {
let _ = rx.await.unwrap();
}
pub async fn test_repartition_mito(store_type: StorageType) {
fn query_partitions_sql(table_name: &str) -> String {
// We query information_schema.partitions to assert repartition results across engines,
// rather than relying on SHOW CREATE TABLE formatting differences.
format!(
"\
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, \
partition_description, greptime_partition_id, partition_ordinal_position \
FROM information_schema.partitions \
WHERE table_name = '{}' \
ORDER BY partition_ordinal_position;",
table_name
)
}
pub async fn test_repartition_mito(store_type: StorageType, flat_format: bool) {
info!(
"test_repartition_mito: store_type: {:?}, flat_format: {:?}",
store_type, flat_format
);
let cluster_name = "test_repartition_mito";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
@@ -147,8 +180,9 @@ pub async fn test_repartition_mito(store_type: StorageType) {
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
// 1. Setup: Create a table with partitions
let sql = r#"
// 1. Setup: Create a table with partitions (format varies by test case)
let sql = if flat_format {
r#"
CREATE TABLE `repartition_mito_table`(
`id` INT,
`city` STRING,
@@ -158,8 +192,25 @@ pub async fn test_repartition_mito(store_type: StorageType) {
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
);
"#;
) ENGINE = mito
WITH (
'sst_format' = 'flat'
);
"#
} else {
r#"
CREATE TABLE `repartition_mito_table`(
`id` INT,
`city` STRING,
`ts` TIMESTAMP TIME INDEX,
PRIMARY KEY(`id`, `city`)
) PARTITION ON COLUMNS (`id`) (
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
) ENGINE = mito;
"#
};
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
@@ -273,30 +324,19 @@ pub async fn test_repartition_mito(store_type: StorageType) {
let result = run_sql(
instance,
"SHOW CREATE TABLE `repartition_mito_table`",
&query_partitions_sql("repartition_mito_table"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_split = r#"+------------------------+-------------------------------------------------------+
| Table | Create Table |
+------------------------+-------------------------------------------------------+
| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( |
| | "id" INT NULL, |
| | "city" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("id", "city") |
| | ) |
| | PARTITION ON COLUMNS ("id") ( |
| | id < 5, |
| | id >= 10 AND id < 20, |
| | id >= 20, |
| | id >= 5 AND id < 10 |
| | ) |
| | ENGINE=mito |
| | |
+------------------------+-------------------------------------------------------+"#;
let expected_create_table_after_split = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 |
| greptime | public | repartition_mito_table | p1 | id | id >= 10 AND id < 20 | 4398046511105 | 2 |
| greptime | public | repartition_mito_table | p2 | id | id >= 20 | 4398046511106 | 3 |
| greptime | public | repartition_mito_table | p3 | id | id >= 5 AND id < 10 | 4398046511107 | 4 |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let sql =
@@ -405,29 +445,18 @@ pub async fn test_repartition_mito(store_type: StorageType) {
let result = run_sql(
instance,
"SHOW CREATE TABLE `repartition_mito_table`",
&query_partitions_sql("repartition_mito_table"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+------------------------+-------------------------------------------------------+
| Table | Create Table |
+------------------------+-------------------------------------------------------+
| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( |
| | "id" INT NULL, |
| | "city" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("id", "city") |
| | ) |
| | PARTITION ON COLUMNS ("id") ( |
| | id < 5, |
| | id >= 10, |
| | id >= 5 AND id < 10 |
| | ) |
| | ENGINE=mito |
| | |
+------------------------+-------------------------------------------------------+"#;
let expected_create_table_after_merge = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 |
| greptime | public | repartition_mito_table | p1 | id | id >= 10 | 4398046511105 | 2 |
| greptime | public | repartition_mito_table | p2 | id | id >= 5 AND id < 10 | 4398046511107 | 3 |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let sql =
@@ -463,7 +492,15 @@ pub async fn test_repartition_mito(store_type: StorageType) {
.unwrap();
}
pub async fn test_repartition_metric(store_type: StorageType) {
pub async fn test_repartition_metric(
store_type: StorageType,
flat_format: bool,
primary_key_encoding: PrimaryKeyEncoding,
) {
info!(
"test_repartition_metric: store_type: {:?}, flat_format: {:?}, primary_key_encoding: {:?}",
store_type, flat_format, primary_key_encoding
);
let cluster_name = "test_repartition_metric";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
@@ -495,7 +532,15 @@ pub async fn test_repartition_metric(store_type: StorageType) {
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
let sql = r#"
// Explicitly configure sst_format and primary key encoding to cover the matrix.
let sst_format = if flat_format { "flat" } else { "primary_key" };
let primary_key_encoding = match primary_key_encoding {
PrimaryKeyEncoding::Dense => "dense",
PrimaryKeyEncoding::Sparse => "sparse",
};
let sql = format!(
r#"
CREATE TABLE `repart_phy_metric`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
@@ -503,10 +548,20 @@ pub async fn test_repartition_metric(store_type: StorageType) {
) PARTITION ON COLUMNS (`host`) (
`host` < 'm',
`host` >= 'm'
) ENGINE = metric WITH ("physical_metric_table" = "");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
) ENGINE = metric
WITH (
"physical_metric_table" = "",
"memtable.type" = "partition_tree",
'sst_format' = '{sst_format}',
"memtable.partition_tree.primary_key_encoding" = "{primary_key_encoding}",
"index.type" = "inverted",
);
"#
);
run_sql(instance, &sql, query_ctx.clone()).await.unwrap();
// A second logical table exercises repartition behavior across multiple logical tables
// sharing the same physical metric table.
let sql = r#"
CREATE TABLE `repart_log_metric`(
`ts` TIMESTAMP TIME INDEX,
@@ -516,6 +571,15 @@ pub async fn test_repartition_metric(store_type: StorageType) {
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
CREATE TABLE `repart_log_metric_job`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`job` STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES
('a_host', '2022-01-01 00:00:00', 1),
@@ -552,32 +616,41 @@ pub async fn test_repartition_metric(store_type: StorageType) {
let result = run_sql(
instance,
"SHOW CREATE TABLE `repart_phy_metric`",
&query_partitions_sql("repart_phy_metric"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_split = r#"+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'g', |
| | host >= 'm', |
| | host >= 'g' AND host < 'm' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------------------+--------------------------------------------------+"#;
// Partition ids and order are expected to be stable within a single test run.
let expected_create_table_after_split = r#"+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+
| greptime | public | repart_phy_metric | p0 | host | host < g | 4398046511104 | 1 |
| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 |
| greptime | public | repart_phy_metric | p2 | host | host >= g AND host < m | 4398046511106 | 3 |
+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let regions = cluster.list_all_regions().await;
let region0 = regions.get(&RegionId::new(1024, 0)).unwrap();
let region2 = regions.get(&RegionId::new(1024, 2)).unwrap();
let primary_keys_in_region_0 = region0
.metadata()
.primary_key_columns()
.cloned()
.collect::<Vec<_>>();
let primary_keys_in_region_2 = region2
.metadata()
.primary_key_columns()
.cloned()
.collect::<Vec<_>>();
info!("primary_keys_in_region_0: {:?}", primary_keys_in_region_0);
info!("primary_keys_in_region_2: {:?}", primary_keys_in_region_2);
assert_eq!(primary_keys_in_region_0, primary_keys_in_region_2);
let sql = r#"
ALTER TABLE `repart_log_metric_job` ADD COLUMN `cpu` STRING PRIMARY KEY;
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
@@ -671,30 +744,17 @@ pub async fn test_repartition_metric(store_type: StorageType) {
let result = run_sql(
instance,
"SHOW CREATE TABLE `repart_phy_metric`",
&query_partitions_sql("repart_phy_metric"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'm', |
| | host >= 'm' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------------------+--------------------------------------------------+"#;
let expected_create_table_after_merge = r#"+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repart_phy_metric | p0 | host | host < m | 4398046511104 | 1 |
| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 |
+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let result = run_sql(
@@ -780,7 +840,13 @@ pub async fn test_repartition_metric(store_type: StorageType) {
| c_host | 2022-01-03T00:00:00 | 5.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
run_sql(
instance,
"DROP TABLE `repart_log_metric_job`",
query_ctx.clone(),
)
.await
.unwrap();
run_sql(
instance,
"DROP TABLE `repart_log_metric`",