diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 438a8fcad3..103b15e596 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -31,7 +31,7 @@ use store_api::storage::{RegionId, TableId}; use crate::engine::MetricEngineInner; use crate::error::{ - ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu, + ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalWriteSnafu, InvalidRequestSnafu, LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnexpectedRequestSnafu, UnsupportedRegionRequestSnafu, }; @@ -55,7 +55,7 @@ impl MetricEngineInner { ); FORBIDDEN_OPERATION_COUNT.inc(); - ForbiddenPhysicalAlterSnafu.fail() + ForbiddenPhysicalWriteSnafu.fail() } else { self.put_logical_region(region_id, request).await } @@ -86,18 +86,31 @@ impl MetricEngineInner { // Fast path: single request, no batching overhead if len == 1 { - let (logical_id, req) = requests.into_iter().next().unwrap(); - return self.put_logical_region(logical_id, req).await; + let (region_id, req) = requests.into_iter().next().unwrap(); + let is_putting_physical_region = + self.state.read().unwrap().exist_physical_region(region_id); + if is_putting_physical_region { + FORBIDDEN_OPERATION_COUNT.inc(); + return ForbiddenPhysicalWriteSnafu.fail(); + } + + return self.put_logical_region(region_id, req).await; } let mut requests_per_physical: HashMap> = HashMap::new(); - for (logical_region_id, request) in requests { - let physical_region_id = self.find_physical_region_id(logical_region_id)?; + for (region_id, request) in requests { + let is_putting_physical_region = + self.state.read().unwrap().exist_physical_region(region_id); + if is_putting_physical_region { + FORBIDDEN_OPERATION_COUNT.inc(); + return ForbiddenPhysicalWriteSnafu.fail(); + } + let physical_region_id = self.find_physical_region_id(region_id)?; requests_per_physical .entry(physical_region_id) .or_default() - .push((logical_region_id, request)); + .push((region_id, request)); } let mut total_affected_rows: AffectedRows = 0; @@ -1226,6 +1239,84 @@ mod tests { assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 0); } + #[tokio::test] + async fn test_batch_write_single_physical_region_forbidden() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let engine = env.metric(); + + let physical_region_id = env.default_physical_region_id(); + let schema = test_util::row_schema_with_tags(&["job"]); + let requests = vec![( + physical_region_id, + RegionPutRequest { + rows: Rows { + schema, + rows: test_util::build_rows(1, 1), + }, + hint: None, + partition_expr_version: None, + }, + )]; + + let err = engine + .inner + .put_regions_batch(requests.into_iter()) + .await + .unwrap_err(); + + assert!(matches!( + err, + crate::error::Error::ForbiddenPhysicalWrite { .. } + )); + } + + #[tokio::test] + async fn test_batch_write_physical_region_forbidden() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let engine = env.metric(); + + let physical_region_id = env.default_physical_region_id(); + let logical_region_id = env.default_logical_region_id(); + let schema = test_util::row_schema_with_tags(&["job"]); + let requests = vec![ + ( + logical_region_id, + RegionPutRequest { + rows: Rows { + schema: schema.clone(), + rows: test_util::build_rows(1, 1), + }, + hint: None, + partition_expr_version: None, + }, + ), + ( + physical_region_id, + RegionPutRequest { + rows: Rows { + schema, + rows: test_util::build_rows(1, 1), + }, + hint: None, + partition_expr_version: None, + }, + ), + ]; + + let err = engine + .inner + .put_regions_batch(requests.into_iter()) + .await + .unwrap_err(); + + assert!(matches!( + err, + crate::error::Error::ForbiddenPhysicalWrite { .. } + )); + } + #[tokio::test] async fn test_batch_write_single_request_fast_path() { let env = TestEnv::new().await; diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 284b1b0298..f01737e764 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -254,6 +254,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Write request to physical region is forbidden"))] + ForbiddenPhysicalWrite { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid region metadata"))] InvalidMetadata { source: store_api::metadata::MetadataError, @@ -411,6 +417,7 @@ impl ErrorExt for Error { | CreateDefault { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } + | ForbiddenPhysicalWrite { .. } | UnsupportedRegionRequest { .. } | MissingFiles { .. } => StatusCode::Unsupported, diff --git a/tests/cases/standalone/common/insert/physical_metric_table_insert.result b/tests/cases/standalone/common/insert/physical_metric_table_insert.result new file mode 100644 index 0000000000..63b2997209 --- /dev/null +++ b/tests/cases/standalone/common/insert/physical_metric_table_insert.result @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS demo_metric_table ( + label STRING NULL, + ts TIMESTAMP(3) NOT NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (label) +) +PARTITION ON COLUMNS (label) ( + label < 'M', + label >= 'M' +) +ENGINE=metric +WITH( + physical_metric_table = 'true', + skip_wal = 'true' +); + +Affected Rows: 0 + +INSERT INTO demo_metric_table (label, ts, val) +VALUES ('A', '2026-05-19 00:00:00', 1.0); + +Error: 1001(Unsupported), Write request to physical region is forbidden + +DROP TABLE demo_metric_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/physical_metric_table_insert.sql b/tests/cases/standalone/common/insert/physical_metric_table_insert.sql new file mode 100644 index 0000000000..0be352986f --- /dev/null +++ b/tests/cases/standalone/common/insert/physical_metric_table_insert.sql @@ -0,0 +1,21 @@ +CREATE TABLE IF NOT EXISTS demo_metric_table ( + label STRING NULL, + ts TIMESTAMP(3) NOT NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (label) +) +PARTITION ON COLUMNS (label) ( + label < 'M', + label >= 'M' +) +ENGINE=metric +WITH( + physical_metric_table = 'true', + skip_wal = 'true' +); + +INSERT INTO demo_metric_table (label, ts, val) +VALUES ('A', '2026-05-19 00:00:00', 1.0); + +DROP TABLE demo_metric_table;