fix: ensures logical and physical region have the same timestamp unit (#6041)

* fix: check time unit of logical region

* test: enlarge ttl for alter test to avoid data expired during test

* chore: fix unused
This commit is contained in:
Yingwen
2025-05-08 11:40:21 +08:00
committed by GitHub
parent e787007eb5
commit 148d96fc38
7 changed files with 105 additions and 20 deletions

View File

@@ -146,6 +146,23 @@ impl MetricEngineInner {
.iter()
.map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
.collect::<HashMap<_, _>>();
let time_index_unit = create_data_region_request
.column_metadatas
.iter()
.find_map(|metadata| {
if metadata.semantic_type == SemanticType::Timestamp {
metadata
.column_schema
.data_type
.as_timestamp()
.map(|data_type| data_type.unit())
} else {
None
}
})
.context(UnexpectedRequestSnafu {
reason: "No time index column found",
})?;
self.mito
.handle_request(
data_region_id,
@@ -170,6 +187,7 @@ impl MetricEngineInner {
physical_columns,
primary_key_encoding,
physical_region_options,
time_index_unit,
);
Ok(())
@@ -184,15 +202,38 @@ impl MetricEngineInner {
) -> Result<()> {
let data_region_id = utils::to_data_region_id(physical_region_id);
ensure!(
self.state
.read()
.unwrap()
.exist_physical_region(data_region_id),
PhysicalRegionNotFoundSnafu {
let unit = self
.state
.read()
.unwrap()
.physical_region_time_index_unit(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
}
);
})?;
// Checks the time index unit of each request.
for (_, request) in &requests {
// Safety: verify_region_create_request() ensures that the request is valid.
let time_index_column = request
.column_metadatas
.iter()
.find(|col| col.semantic_type == SemanticType::Timestamp)
.unwrap();
let request_unit = time_index_column
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
ensure!(
request_unit == unit,
UnexpectedRequestSnafu {
reason: format!(
"Metric has differenttime unit ({:?}) than the physical region ({:?})",
request_unit, unit
),
}
);
}
// Filters out the requests that the logical region already exists
let requests = {

View File

@@ -16,6 +16,7 @@
use std::collections::HashSet;
use api::v1::SemanticType;
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
@@ -223,6 +224,21 @@ impl MetricEngineInner {
{
let mut state = self.state.write().unwrap();
// recover physical column names
// Safety: The physical columns are loaded from the data region, which always
// has a time index.
let time_index_unit = physical_columns
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
col.column_schema
.data_type
.as_timestamp()
.map(|data_type| data_type.unit())
} else {
None
}
})
.unwrap();
let physical_columns = physical_columns
.into_iter()
.map(|col| (col.column_schema.name, col.column_id))
@@ -232,6 +248,7 @@ impl MetricEngineInner {
physical_columns,
primary_key_encoding,
physical_region_options,
time_index_unit,
);
// recover logical regions
for logical_region_id in &logical_regions {

View File

@@ -16,6 +16,7 @@
use std::collections::{HashMap, HashSet};
use common_time::timestamp::TimeUnit;
use snafu::OptionExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
@@ -31,6 +32,7 @@ pub struct PhysicalRegionState {
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
}
impl PhysicalRegionState {
@@ -38,12 +40,14 @@ impl PhysicalRegionState {
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
) -> Self {
Self {
logical_regions: HashSet::new(),
physical_columns,
primary_key_encoding,
options,
time_index_unit,
}
}
@@ -89,11 +93,17 @@ impl MetricEngineState {
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
) {
let physical_region_id = to_data_region_id(physical_region_id);
self.physical_regions.insert(
physical_region_id,
PhysicalRegionState::new(physical_columns, primary_key_encoding, options),
PhysicalRegionState::new(
physical_columns,
primary_key_encoding,
options,
time_index_unit,
),
);
}
@@ -178,6 +188,15 @@ impl MetricEngineState {
self.physical_regions.contains_key(&physical_region_id)
}
pub fn physical_region_time_index_unit(
&self,
physical_region_id: RegionId,
) -> Option<TimeUnit> {
self.physical_regions
.get(&physical_region_id)
.map(|state| state.time_index_unit)
}
pub fn get_primary_key_encoding(
&self,
physical_region_id: RegionId,

View File

@@ -155,9 +155,9 @@ ADD
Affected Rows: 0
ALTER TABLE
t2
ADD
ALTER TABLE
t2
ADD
COLUMN at4 UINT16;
Affected Rows: 0
@@ -215,7 +215,7 @@ SELECT * FROM grpc_latencies;
| 2024-07-11T20:00:06 | host1 | GetUser | 103.0 |
+---------------------+-------+-------------+---------+
ALTER TABLE grpc_latencies SET ttl = '10d';
ALTER TABLE grpc_latencies SET ttl = '10000d';
Affected Rows: 0

View File

@@ -75,9 +75,9 @@ ALTER TABLE
ADD
COLUMN at2 STRING;
ALTER TABLE
t2
ADD
ALTER TABLE
t2
ADD
COLUMN at4 UINT16;
INSERT INTO
@@ -109,10 +109,10 @@ INSERT INTO grpc_latencies (ts, host, method_name, latency) VALUES
SELECT * FROM grpc_latencies;
ALTER TABLE grpc_latencies SET ttl = '10d';
ALTER TABLE grpc_latencies SET ttl = '10000d';
ALTER TABLE grpc_latencies ADD COLUMN home INTEGER FIRST;
SELECT * FROM grpc_latencies;
DROP TABLE grpc_latencies;
DROP TABLE grpc_latencies;

View File

@@ -48,6 +48,11 @@ CREATE TABLE t5 (ts timestamp time index, valval double, host string primary key
Error: 1004(InvalidArguments), Adding field column valval to physical table
-- create logical table with different time unit on time index column
CREATE TABLE t6 (ts timestamp(6) time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
Error: 1004(InvalidArguments), Unexpected request: Metric has differenttime unit (Microsecond) than the physical region (Millisecond)
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;
+---------------+--------------+------------+------------+--------+
@@ -204,7 +209,7 @@ CREATE TABLE t1(ts timestamp time index, val double, host string primary key) en
Affected Rows: 0
INSERT INTO t1 (ts, val, host) VALUES
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com'),

View File

@@ -20,6 +20,9 @@ CREATE TABLE t4 (ts timestamp time index, val double, host double, primary key (
-- create logical table with different column name on field column
CREATE TABLE t5 (ts timestamp time index, valval double, host string primary key) engine = metric with ("on_physical_table" = "phy");
-- create logical table with different time unit on time index column
CREATE TABLE t6 (ts timestamp(6) time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;
DESC TABLE phy;
@@ -73,7 +76,7 @@ with
CREATE TABLE t1(ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy");
INSERT INTO t1 (ts, val, host) VALUES
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com'),