mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 17:00:37 +00:00
fix: reject physical metric table writes (#8153)
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -31,7 +31,7 @@ use store_api::storage::{RegionId, TableId};
|
||||
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::{
|
||||
ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
|
||||
ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalWriteSnafu, InvalidRequestSnafu,
|
||||
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnexpectedRequestSnafu,
|
||||
UnsupportedRegionRequestSnafu,
|
||||
};
|
||||
@@ -55,7 +55,7 @@ impl MetricEngineInner {
|
||||
);
|
||||
FORBIDDEN_OPERATION_COUNT.inc();
|
||||
|
||||
ForbiddenPhysicalAlterSnafu.fail()
|
||||
ForbiddenPhysicalWriteSnafu.fail()
|
||||
} else {
|
||||
self.put_logical_region(region_id, request).await
|
||||
}
|
||||
@@ -86,18 +86,31 @@ impl MetricEngineInner {
|
||||
|
||||
// Fast path: single request, no batching overhead
|
||||
if len == 1 {
|
||||
let (logical_id, req) = requests.into_iter().next().unwrap();
|
||||
return self.put_logical_region(logical_id, req).await;
|
||||
let (region_id, req) = requests.into_iter().next().unwrap();
|
||||
let is_putting_physical_region =
|
||||
self.state.read().unwrap().exist_physical_region(region_id);
|
||||
if is_putting_physical_region {
|
||||
FORBIDDEN_OPERATION_COUNT.inc();
|
||||
return ForbiddenPhysicalWriteSnafu.fail();
|
||||
}
|
||||
|
||||
return self.put_logical_region(region_id, req).await;
|
||||
}
|
||||
|
||||
let mut requests_per_physical: HashMap<RegionId, Vec<(RegionId, RegionPutRequest)>> =
|
||||
HashMap::new();
|
||||
for (logical_region_id, request) in requests {
|
||||
let physical_region_id = self.find_physical_region_id(logical_region_id)?;
|
||||
for (region_id, request) in requests {
|
||||
let is_putting_physical_region =
|
||||
self.state.read().unwrap().exist_physical_region(region_id);
|
||||
if is_putting_physical_region {
|
||||
FORBIDDEN_OPERATION_COUNT.inc();
|
||||
return ForbiddenPhysicalWriteSnafu.fail();
|
||||
}
|
||||
let physical_region_id = self.find_physical_region_id(region_id)?;
|
||||
requests_per_physical
|
||||
.entry(physical_region_id)
|
||||
.or_default()
|
||||
.push((logical_region_id, request));
|
||||
.push((region_id, request));
|
||||
}
|
||||
|
||||
let mut total_affected_rows: AffectedRows = 0;
|
||||
@@ -1226,6 +1239,84 @@ mod tests {
|
||||
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_write_single_physical_region_forbidden() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let engine = env.metric();
|
||||
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
let schema = test_util::row_schema_with_tags(&["job"]);
|
||||
let requests = vec![(
|
||||
physical_region_id,
|
||||
RegionPutRequest {
|
||||
rows: Rows {
|
||||
schema,
|
||||
rows: test_util::build_rows(1, 1),
|
||||
},
|
||||
hint: None,
|
||||
partition_expr_version: None,
|
||||
},
|
||||
)];
|
||||
|
||||
let err = engine
|
||||
.inner
|
||||
.put_regions_batch(requests.into_iter())
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
crate::error::Error::ForbiddenPhysicalWrite { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_write_physical_region_forbidden() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let engine = env.metric();
|
||||
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
let schema = test_util::row_schema_with_tags(&["job"]);
|
||||
let requests = vec![
|
||||
(
|
||||
logical_region_id,
|
||||
RegionPutRequest {
|
||||
rows: Rows {
|
||||
schema: schema.clone(),
|
||||
rows: test_util::build_rows(1, 1),
|
||||
},
|
||||
hint: None,
|
||||
partition_expr_version: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
physical_region_id,
|
||||
RegionPutRequest {
|
||||
rows: Rows {
|
||||
schema,
|
||||
rows: test_util::build_rows(1, 1),
|
||||
},
|
||||
hint: None,
|
||||
partition_expr_version: None,
|
||||
},
|
||||
),
|
||||
];
|
||||
|
||||
let err = engine
|
||||
.inner
|
||||
.put_regions_batch(requests.into_iter())
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
crate::error::Error::ForbiddenPhysicalWrite { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_write_single_request_fast_path() {
|
||||
let env = TestEnv::new().await;
|
||||
|
||||
@@ -254,6 +254,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Write request to physical region is forbidden"))]
|
||||
ForbiddenPhysicalWrite {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid region metadata"))]
|
||||
InvalidMetadata {
|
||||
source: store_api::metadata::MetadataError,
|
||||
@@ -411,6 +417,7 @@ impl ErrorExt for Error {
|
||||
| CreateDefault { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ForbiddenPhysicalAlter { .. }
|
||||
| ForbiddenPhysicalWrite { .. }
|
||||
| UnsupportedRegionRequest { .. }
|
||||
| MissingFiles { .. } => StatusCode::Unsupported,
|
||||
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
CREATE TABLE IF NOT EXISTS demo_metric_table (
|
||||
label STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
val DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY (label)
|
||||
)
|
||||
PARTITION ON COLUMNS (label) (
|
||||
label < 'M',
|
||||
label >= 'M'
|
||||
)
|
||||
ENGINE=metric
|
||||
WITH(
|
||||
physical_metric_table = 'true',
|
||||
skip_wal = 'true'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO demo_metric_table (label, ts, val)
|
||||
VALUES ('A', '2026-05-19 00:00:00', 1.0);
|
||||
|
||||
Error: 1001(Unsupported), Write request to physical region is forbidden
|
||||
|
||||
DROP TABLE demo_metric_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
CREATE TABLE IF NOT EXISTS demo_metric_table (
|
||||
label STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
val DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY (label)
|
||||
)
|
||||
PARTITION ON COLUMNS (label) (
|
||||
label < 'M',
|
||||
label >= 'M'
|
||||
)
|
||||
ENGINE=metric
|
||||
WITH(
|
||||
physical_metric_table = 'true',
|
||||
skip_wal = 'true'
|
||||
);
|
||||
|
||||
INSERT INTO demo_metric_table (label, ts, val)
|
||||
VALUES ('A', '2026-05-19 00:00:00', 1.0);
|
||||
|
||||
DROP TABLE demo_metric_table;
|
||||
Reference in New Issue
Block a user