diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index ca2798869d..75845414d3 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -551,7 +551,7 @@ mod tests { use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; - use table::requests::{AddColumnRequest, AlterKind}; + use table::requests::{AddColumnRequest, AlterKind, DeleteRequest}; use tempdir::TempDir; use super::*; @@ -1174,4 +1174,48 @@ mod tests { table_engine.create_table(&ctx, request).await.unwrap(); assert!(table_engine.table_exists(&engine_ctx, &table_reference)); } + + #[tokio::test] + async fn test_table_delete_rows() { + let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts: VectorRef = + Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1])); + + columns_values.insert("host".to_string(), hosts.clone()); + columns_values.insert("cpu".to_string(), cpus.clone()); + columns_values.insert("memory".to_string(), memories.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(4, table.insert(insert_req).await.unwrap()); + + let del_hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host3"])); + let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); + let mut key_column_values = HashMap::with_capacity(2); + key_column_values.insert("host".to_string(), del_hosts); + key_column_values.insert("ts".to_string(), del_tss); + let del_req = DeleteRequest { key_column_values }; + table.delete(del_req).await.unwrap(); + + let session_ctx = SessionContext::new(); + let stream = table.scan(None, &[], None).await.unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); + let batches = util::collect_batches(stream).await.unwrap(); + + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+-----+--------+-------------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+-------------------------+ +| host2 | 2 | 2 | 1970-01-01T00:00:00.002 | +| host4 | 4 | 4 | 1970-01-01T00:00:00.001 | ++-------+-----+--------+-------------------------+" + ); + } } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 609eede848..e60523a71b 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -41,7 +41,9 @@ use table::error::Result as TableResult; use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; -use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; +use table::requests::{ + AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, +}; use table::table::scan::SimpleTableScan; use table::table::{AlterContext, Table}; use tokio::sync::Mutex; @@ -161,6 +163,10 @@ impl Table for MitoTable { Ok(Arc::new(SimpleTableScan::new(stream))) } + fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result { + Ok(FilterPushDownType::Inexact) + } + /// Alter table changes the schemas of the table. async fn alter(&self, _context: AlterContext, req: AlterTableRequest) -> TableResult<()> { let _lock = self.alter_lock.lock().await; @@ -237,8 +243,34 @@ impl Table for MitoTable { Ok(()) } - fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result { - Ok(FilterPushDownType::Inexact) + async fn delete(&self, request: DeleteRequest) -> TableResult { + if request.key_column_values.is_empty() { + return Ok(0); + } + + let mut write_request = self.region.write_request(); + + let key_column_values = request.key_column_values; + // Safety: key_column_values isn't empty. + let rows_num = key_column_values.values().next().unwrap().len(); + + logging::trace!( + "Delete from table {} where key_columns are: {:?}", + self.table_info().name, + key_column_values + ); + + write_request + .delete(key_column_values) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + self.region + .write(&WriteContext::default(), write_request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(rows_num) } } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 32e31a5250..381b192398 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -102,6 +102,9 @@ pub enum Error { #[snafu(display("Failed to operate table, source: {}", source))] TableOperation { source: BoxedError }, + + #[snafu(display("Unsupported operation: {}", operation))] + Unsupported { operation: String }, } impl ErrorExt for Error { @@ -119,6 +122,7 @@ impl ErrorExt for Error { Error::SchemaBuild { source, .. } => source.status_code(), Error::TableOperation { source } => source.status_code(), Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, + Error::Unsupported { .. } => StatusCode::Unsupported, } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index f26f01fd9b..8e8faabe1f 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -90,3 +90,12 @@ pub struct DropTableRequest { pub schema_name: String, pub table_name: String, } + +/// Delete (by primary key) request +#[derive(Debug)] +pub struct DeleteRequest { + /// Values of each column in this table's primary key and time index. + /// + /// The key is the column name, and the value is the column value. + pub key_column_values: HashMap, +} diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 1371177536..30c7b6cc50 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -24,9 +24,9 @@ use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; -use crate::error::Result; +use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, InsertRequest}; +use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; pub type AlterContext = anymap::Map; @@ -49,8 +49,13 @@ pub trait Table: Send + Sync { } /// Insert values into table. + /// + /// Returns number of inserted rows. async fn insert(&self, _request: InsertRequest) -> Result { - unimplemented!(); + UnsupportedSnafu { + operation: "INSERT", + } + .fail()? } /// Scan the table and returns a SendableRecordBatchStream. @@ -71,9 +76,22 @@ pub trait Table: Send + Sync { Ok(FilterPushDownType::Unsupported) } - async fn alter(&self, _context: AlterContext, request: AlterTableRequest) -> Result<()> { - let _ = request; - unimplemented!() + /// Alter table. + async fn alter(&self, _context: AlterContext, _request: AlterTableRequest) -> Result<()> { + UnsupportedSnafu { + operation: "ALTER TABLE", + } + .fail()? + } + + /// Delete rows in the table. + /// + /// Returns number of deleted rows. + async fn delete(&self, _request: DeleteRequest) -> Result { + UnsupportedSnafu { + operation: "DELETE", + } + .fail()? } }