fix(mito): do not check nullability of fields in delete requests (#2815)

* test: test for delete rows from table with non null columns

* test: test delete and reopen

* fix: allow deleting rows with non null column
This commit is contained in:
Yingwen
2023-11-27 17:54:50 +08:00
committed by GitHub
parent 0badb3715e
commit 6100cb335a
5 changed files with 206 additions and 15 deletions

View File

@@ -29,7 +29,7 @@ use super::*;
use crate::region::version::VersionControlData;
use crate::test_util::{
build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema,
flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};
#[tokio::test]
@@ -353,6 +353,55 @@ async fn test_put_delete() {
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_delete_not_null_fields() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().all_not_null(true).build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
let delete_schema = delete_rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 4, 0),
};
put_rows(&engine, region_id, rows).await;
// Delete (a, 2)
let rows = Rows {
schema: delete_schema.clone(),
rows: build_delete_rows_for_key("a", 2, 3),
};
delete_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 3.0 | 1970-01-01T00:00:03 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Reopen and scan again.
reopen_region(&engine, region_id, region_dir, false).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_put_overwrite() {
let mut env = TestEnv::new();

View File

@@ -274,17 +274,21 @@ impl WriteRequest {
/// Checks whether we should allow a row doesn't provide this column.
fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
// For delete request, all tags and timestamp is required. We don't fill default
// tag or timestamp while deleting rows.
ensure!(
self.op_type != OpType::Delete || column.semantic_type == SemanticType::Field,
InvalidRequestSnafu {
region_id: self.region_id,
reason: format!("delete requests need column {}", column.column_schema.name),
if self.op_type == OpType::Delete {
if column.semantic_type == SemanticType::Field {
// For delete request, all tags and timestamp is required. We don't fill default
// tag or timestamp while deleting rows.
return Ok(());
} else {
return InvalidRequestSnafu {
region_id: self.region_id,
reason: format!("delete requests need column {}", column.column_schema.name),
}
.fail();
}
);
}
// Checks whether they have default value.
// Not a delete request. Checks whether they have default value.
ensure!(
column.column_schema.is_nullable()
|| column.column_schema.default_constraint().is_some(),
@@ -990,7 +994,7 @@ mod tests {
assert_eq!(expect_rows, request.rows);
}
fn region_metadata_two_fields() -> RegionMetadata {
fn builder_with_ts_tag() -> RegionMetadataBuilder {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
@@ -1011,6 +1015,13 @@ mod tests {
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder
}
fn region_metadata_two_fields() -> RegionMetadata {
let mut builder = builder_with_ts_tag();
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"f0",
@@ -1033,8 +1044,7 @@ mod tests {
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![2]);
});
builder.build().unwrap()
}
@@ -1100,6 +1110,75 @@ mod tests {
assert_eq!(expect_rows, request.rows);
}
#[test]
fn test_fill_missing_without_default_in_delete() {
let mut builder = builder_with_ts_tag();
builder
// f0 is nullable.
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"f0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 3,
})
// f1 is not nullable and don't has default.
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"f1",
ConcreteDataType::int64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 4,
});
let metadata = builder.build().unwrap();
let rows = Rows {
schema: vec![
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
],
// Missing f0 (nullable), f1 (not nullable).
rows: vec![Row {
values: vec![i64_value(100), ts_ms_value(1)],
}],
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
request.fill_missing_columns(&metadata).unwrap();
let expect_rows = Rows {
schema: vec![
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
],
// Column f1 is not nullable and we use 0 for padding.
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value { value_data: None },
i64_value(0),
],
}],
};
assert_eq!(expect_rows, request.rows);
}
#[test]
fn test_no_default() {
let rows = Rows {

View File

@@ -277,6 +277,7 @@ pub struct CreateRequestBuilder {
field_num: usize,
options: HashMap<String, String>,
primary_key: Option<Vec<ColumnId>>,
all_not_null: bool,
}
impl Default for CreateRequestBuilder {
@@ -287,6 +288,7 @@ impl Default for CreateRequestBuilder {
field_num: 1,
options: HashMap::new(),
primary_key: None,
all_not_null: false,
}
}
}
@@ -321,21 +323,29 @@ impl CreateRequestBuilder {
self
}
#[must_use]
pub fn insert_option(mut self, key: &str, value: &str) -> Self {
self.options.insert(key.to_string(), value.to_string());
self
}
#[must_use]
pub fn all_not_null(mut self, value: bool) -> Self {
self.all_not_null = value;
self
}
pub fn build(&self) -> RegionCreateRequest {
let mut column_id = 0;
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
let mut primary_key = Vec::with_capacity(self.tag_num);
let nullable = !self.all_not_null;
for i in 0..self.tag_num {
column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("tag_{i}"),
ConcreteDataType::string_datatype(),
true,
nullable,
),
semantic_type: SemanticType::Tag,
column_id,
@@ -348,7 +358,7 @@ impl CreateRequestBuilder {
column_schema: ColumnSchema::new(
format!("field_{i}"),
ConcreteDataType::float64_datatype(),
true,
nullable,
),
semantic_type: SemanticType::Field,
column_id,
@@ -359,6 +369,7 @@ impl CreateRequestBuilder {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
// Time index is always not null.
false,
),
semantic_type: SemanticType::Timestamp,

View File

@@ -0,0 +1,38 @@
CREATE TABLE monitor (host STRING NOT NULL, ts TIMESTAMP NOT NULL, cpu DOUBLE NOT NULL, memory DOUBLE NOT NULL, TIME INDEX (ts), PRIMARY KEY(host));
Affected Rows: 0
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1655276557000, 'host1', 10.0, 1024),
(1655276557000, 'host2', 20.0, 1024),
(1655276557000, 'host3', 30.0, 1024);
Affected Rows: 3
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts, host;
+---------------------+-------+------+--------+
| ts | host | cpu | memory |
+---------------------+-------+------+--------+
| 2022-06-15T07:02:37 | host1 | 10.0 | 1024.0 |
| 2022-06-15T07:02:37 | host2 | 20.0 | 1024.0 |
| 2022-06-15T07:02:37 | host3 | 30.0 | 1024.0 |
+---------------------+-------+------+--------+
DELETE FROM monitor WHERE host = 'host2';
Affected Rows: 1
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts, host;
+---------------------+-------+------+--------+
| ts | host | cpu | memory |
+---------------------+-------+------+--------+
| 2022-06-15T07:02:37 | host1 | 10.0 | 1024.0 |
| 2022-06-15T07:02:37 | host3 | 30.0 | 1024.0 |
+---------------------+-------+------+--------+
DROP TABLE monitor;
Affected Rows: 0

View File

@@ -0,0 +1,14 @@
CREATE TABLE monitor (host STRING NOT NULL, ts TIMESTAMP NOT NULL, cpu DOUBLE NOT NULL, memory DOUBLE NOT NULL, TIME INDEX (ts), PRIMARY KEY(host));
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1655276557000, 'host1', 10.0, 1024),
(1655276557000, 'host2', 20.0, 1024),
(1655276557000, 'host3', 30.0, 1024);
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts, host;
DELETE FROM monitor WHERE host = 'host2';
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts, host;
DROP TABLE monitor;