feat: Implement delete for table (#801)

* feat: Table default implementations for insert/alter return error

* feat: Implement delete for mito table

* docs: Fix comment
This commit is contained in:
Yingwen
2023-01-05 20:03:40 +08:00
committed by GitHub
parent 8f5ecefc90
commit 072e5f78b4
5 changed files with 117 additions and 10 deletions

View File

@@ -551,7 +551,7 @@ mod tests {
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::requests::{AddColumnRequest, AlterKind};
use table::requests::{AddColumnRequest, AlterKind, DeleteRequest};
use tempdir::TempDir;
use super::*;
@@ -1174,4 +1174,48 @@ mod tests {
table_engine.create_table(&ctx, request).await.unwrap();
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
}
#[tokio::test]
async fn test_table_delete_rows() {
let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts: VectorRef =
Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"]));
let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1]));
columns_values.insert("host".to_string(), hosts.clone());
columns_values.insert("cpu".to_string(), cpus.clone());
columns_values.insert("memory".to_string(), memories.clone());
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(4, table.insert(insert_req).await.unwrap());
let del_hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host3"]));
let del_tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2]));
let mut key_column_values = HashMap::with_capacity(2);
key_column_values.insert("host".to_string(), del_hosts);
key_column_values.insert("ts".to_string(), del_tss);
let del_req = DeleteRequest { key_column_values };
table.delete(del_req).await.unwrap();
let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect_batches(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+-----+--------+-------------------------+
| host | cpu | memory | ts |
+-------+-----+--------+-------------------------+
| host2 | 2 | 2 | 1970-01-01T00:00:00.002 |
| host4 | 4 | 4 | 1970-01-01T00:00:00.001 |
+-------+-----+--------+-------------------------+"
);
}
}

View File

@@ -41,7 +41,9 @@ use table::error::Result as TableResult;
use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
};
use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, Table};
use tokio::sync::Mutex;
@@ -161,6 +163,10 @@ impl<R: Region> Table for MitoTable<R> {
Ok(Arc::new(SimpleTableScan::new(stream)))
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result<FilterPushDownType> {
Ok(FilterPushDownType::Inexact)
}
/// Alter table changes the schemas of the table.
async fn alter(&self, _context: AlterContext, req: AlterTableRequest) -> TableResult<()> {
let _lock = self.alter_lock.lock().await;
@@ -237,8 +243,34 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result<FilterPushDownType> {
Ok(FilterPushDownType::Inexact)
async fn delete(&self, request: DeleteRequest) -> TableResult<usize> {
if request.key_column_values.is_empty() {
return Ok(0);
}
let mut write_request = self.region.write_request();
let key_column_values = request.key_column_values;
// Safety: key_column_values isn't empty.
let rows_num = key_column_values.values().next().unwrap().len();
logging::trace!(
"Delete from table {} where key_columns are: {:?}",
self.table_info().name,
key_column_values
);
write_request
.delete(key_column_values)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
self.region
.write(&WriteContext::default(), write_request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(rows_num)
}
}

View File

@@ -102,6 +102,9 @@ pub enum Error {
#[snafu(display("Failed to operate table, source: {}", source))]
TableOperation { source: BoxedError },
#[snafu(display("Unsupported operation: {}", operation))]
Unsupported { operation: String },
}
impl ErrorExt for Error {
@@ -119,6 +122,7 @@ impl ErrorExt for Error {
Error::SchemaBuild { source, .. } => source.status_code(),
Error::TableOperation { source } => source.status_code(),
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
Error::Unsupported { .. } => StatusCode::Unsupported,
}
}

View File

@@ -90,3 +90,12 @@ pub struct DropTableRequest {
pub schema_name: String,
pub table_name: String,
}
/// Delete (by primary key) request
#[derive(Debug)]
pub struct DeleteRequest {
/// Values of each column in this table's primary key and time index.
///
/// The key is the column name, and the value is the column value.
pub key_column_values: HashMap<String, VectorRef>,
}

View File

@@ -24,9 +24,9 @@ use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::error::{Result, UnsupportedSnafu};
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};
use crate::requests::{AlterTableRequest, InsertRequest};
use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
pub type AlterContext = anymap::Map<dyn Any + Send + Sync>;
@@ -49,8 +49,13 @@ pub trait Table: Send + Sync {
}
/// Insert values into table.
///
/// Returns number of inserted rows.
async fn insert(&self, _request: InsertRequest) -> Result<usize> {
unimplemented!();
UnsupportedSnafu {
operation: "INSERT",
}
.fail()?
}
/// Scan the table and returns a SendableRecordBatchStream.
@@ -71,9 +76,22 @@ pub trait Table: Send + Sync {
Ok(FilterPushDownType::Unsupported)
}
async fn alter(&self, _context: AlterContext, request: AlterTableRequest) -> Result<()> {
let _ = request;
unimplemented!()
/// Alter table.
async fn alter(&self, _context: AlterContext, _request: AlterTableRequest) -> Result<()> {
UnsupportedSnafu {
operation: "ALTER TABLE",
}
.fail()?
}
/// Delete rows in the table.
///
/// Returns number of deleted rows.
async fn delete(&self, _request: DeleteRequest) -> Result<usize> {
UnsupportedSnafu {
operation: "DELETE",
}
.fail()?
}
}