From 4d56d896ca0b045b73bc9c43bc4f82168e8d6b5a Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 30 Dec 2022 17:12:18 +0800 Subject: [PATCH] feat: Implement delete for the storage engine (#777) * docs: Fix incorrect comment of Vector::only_null * feat: Add delete to WriteRequest and WriteBatch * feat: Filter deleted rows * fix: Fix panic after reopening engine This is detected by adding a reopen step to the delete test for region. * fix: Fix OpType::min_type() * test: Add delete absent key test * chore: Address CR comments --- src/datatypes/src/vectors.rs | 2 +- src/mito/src/table/test_util/mock_engine.rs | 10 +- src/storage/benches/wal/util/mod.rs | 1 + .../benches/wal/util/write_batch_util.rs | 8 +- src/storage/src/error.rs | 8 +- src/storage/src/memtable/inserter.rs | 1 + src/storage/src/memtable/tests.rs | 2 +- src/storage/src/proto/wal.rs | 1 + src/storage/src/read.rs | 8 + src/storage/src/read/dedup.rs | 4 +- src/storage/src/region.rs | 10 +- src/storage/src/region/tests.rs | 29 ++- src/storage/src/region/tests/basic.rs | 65 +++++++ src/storage/src/region/tests/projection.rs | 1 + src/storage/src/schema/projected.rs | 51 ++++- src/storage/src/schema/store.rs | 14 +- src/storage/src/sst/parquet.rs | 7 +- src/storage/src/test_util/write_batch_util.rs | 8 +- src/storage/src/wal.rs | 21 +- src/storage/src/write_batch.rs | 181 +++++++++++++++++- src/storage/src/write_batch/codec.rs | 4 +- src/storage/src/write_batch/compat.rs | 13 +- src/store-api/src/storage/requests.rs | 5 + src/store-api/src/storage/types.rs | 14 +- 24 files changed, 413 insertions(+), 55 deletions(-) diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index fe71a6a7c3..09ac4dc1ee 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -111,7 +111,7 @@ pub trait Vector: Send + Sync + Serializable + Debug + VectorOp { /// Returns whether row is null. fn is_null(&self, row: usize) -> bool; - /// If the only value vector can contain is NULL. + /// If the vector only contains NULL. fn only_null(&self) -> bool { self.null_count() == self.len() } diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index f3b859c82c..c659bc08e7 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -27,8 +27,8 @@ use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::WriteBatch; use store_api::storage::{ AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, - OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, - ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, + OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, ScanResponse, + SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -173,7 +173,11 @@ impl Region for MockRegion { } fn write_request(&self) -> WriteBatch { - WriteBatch::new(self.in_memory_metadata().schema().clone()) + let metadata = self.inner.metadata.load(); + let user_schema = metadata.user_schema().clone(); + let row_key_end = metadata.schema().store_schema().row_key_end(); + + WriteBatch::new(user_schema, row_key_end) } async fn alter(&self, request: AlterRequest) -> Result<()> { diff --git a/src/storage/benches/wal/util/mod.rs b/src/storage/benches/wal/util/mod.rs index b9df47710d..d3b33f4401 100644 --- a/src/storage/benches/wal/util/mod.rs +++ b/src/storage/benches/wal/util/mod.rs @@ -43,6 +43,7 @@ pub fn new_test_batch() -> WriteBatch { ("10", LogicalTypeId::String, false), ], Some(2), + 3, ) } diff --git a/src/storage/benches/wal/util/write_batch_util.rs b/src/storage/benches/wal/util/write_batch_util.rs index b3743d5cc2..22f183511f 100644 --- a/src/storage/benches/wal/util/write_batch_util.rs +++ b/src/storage/benches/wal/util/write_batch_util.rs @@ -16,8 +16,12 @@ use storage::write_batch::WriteBatch; use crate::memtable::util::schema_util::{self, ColumnDef}; -pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option) -> WriteBatch { +pub fn new_write_batch( + column_defs: &[ColumnDef], + timestamp_index: Option, + row_key_end: usize, +) -> WriteBatch { let schema = schema_util::new_schema_ref(column_defs, timestamp_index); - WriteBatch::new(schema) + WriteBatch::new(schema, row_key_end) } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index e26f42db2c..bac2851052 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -404,7 +404,7 @@ pub enum Error { expect, given ))] - LenNotEquals { + UnequalLengths { name: String, expect: usize, given: usize, @@ -434,6 +434,9 @@ pub enum Error { backtrace: Backtrace, source: datatypes::error::Error, }, + + #[snafu(display("More columns than expected in the request"))] + MoreColumnThanExpected { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -455,7 +458,8 @@ impl ErrorExt for Error { | RequestTooLarge { .. } | TypeMismatch { .. } | HasNull { .. } - | LenNotEquals { .. } => StatusCode::InvalidArguments, + | UnequalLengths { .. } + | MoreColumnThanExpected { .. } => StatusCode::InvalidArguments, Utf8 { .. } | EncodeJson { .. } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 24af9e8c68..8fbbd504e1 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -143,6 +143,7 @@ mod tests { ("value", LogicalTypeId::Int64, true), ], Some(0), + 1, ) } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 2ede68cc0f..f4eef66c17 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -583,7 +583,7 @@ fn test_memtable_projection() { let k1 = Arc::new(UInt64Vector::from_slice(&[0, 1, 2])) as VectorRef; let v0 = Arc::new(UInt64Vector::from_slice(&[10, 11, 12])) as VectorRef; let sequences = Arc::new(UInt64Vector::from_slice(&[9, 9, 9])) as VectorRef; - let op_types = Arc::new(UInt8Vector::from_slice(&[0, 0, 0])) as VectorRef; + let op_types = Arc::new(UInt8Vector::from_slice(&[1, 1, 1])) as VectorRef; assert_eq!(k0, *batch.column(0)); assert_eq!(k1, *batch.column(1)); diff --git a/src/storage/src/proto/wal.rs b/src/storage/src/proto/wal.rs index b1876312cd..2a7faebb10 100644 --- a/src/storage/src/proto/wal.rs +++ b/src/storage/src/proto/wal.rs @@ -24,6 +24,7 @@ pub fn gen_mutation_types(payload: &Payload) -> Vec { .mutations .iter() .map(|m| match m.op_type { + OpType::Delete => MutationType::Delete.into(), OpType::Put => MutationType::Put.into(), }) .collect::>() diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 3c81d37c15..f625b53be2 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -135,6 +135,14 @@ pub trait BatchOp { /// Note that the nulls of `filter` are interpreted as `false` will lead to these elements /// being masked out. fn filter(&self, batch: &Batch, filter: &BooleanVector) -> Result; + + /// Unselect deleted rows according to the [`OpType`](store_api::storage::OpType). + /// + /// # Panics + /// Panics if + /// - `batch` doesn't have a valid op type column. + /// - `selected.len()` is less than the number of rows. + fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec); } /// Reusable [Batch] builder. diff --git a/src/storage/src/read/dedup.rs b/src/storage/src/read/dedup.rs index 5d359df3d6..ab8485a910 100644 --- a/src/storage/src/read/dedup.rs +++ b/src/storage/src/read/dedup.rs @@ -66,8 +66,8 @@ impl DedupReader { .get_or_insert_with(Batch::default) .clone_from(&batch); // Use `clone_from` to reuse allocated memory if possible. - // TODO(yingwen): To support `DELETE`, we could find all rows whose op_types are equal - // to `OpType::Delete`, mark their `selected` to false, then filter the batch. + // Find all rows whose op_types are `OpType::Delete`, mark their `selected` to false. + self.schema.unselect_deleted(&batch, &mut self.selected); let filter = BooleanVector::from_iterator(self.selected.iter().by_vals()); // Filter duplicate rows. diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 305f877f86..5c8f2cd851 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -24,8 +24,8 @@ use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber, - WriteContext, WriteResponse, + AlterRequest, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext, + WriteResponse, }; use crate::error::{self, Error, Result}; @@ -91,7 +91,11 @@ impl Region for RegionImpl { } fn write_request(&self) -> Self::WriteRequest { - WriteBatch::new(self.in_memory_metadata().schema().clone()) + let metadata = self.inner.version_control().metadata(); + let user_schema = metadata.user_schema().clone(); + let row_key_end = metadata.schema().store_schema().row_key_end(); + + WriteBatch::new(user_schema, row_key_end) } async fn alter(&self, request: AlterRequest) -> Result<()> { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index d14040ecdf..df8ef81ee6 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -31,7 +31,7 @@ use log_store::fs::noop::NoopLogStore; use object_store::backend::fs; use object_store::ObjectStore; use store_api::storage::{ - consts, Chunk, ChunkReader, ScanRequest, SequenceNumber, Snapshot, WriteRequest, + consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest, }; use tempdir::TempDir; @@ -124,6 +124,17 @@ impl TesterBase { pub fn committed_sequence(&self) -> SequenceNumber { self.region.committed_sequence() } + + /// Delete by keys (timestamp). + pub async fn delete(&self, keys: &[i64]) -> WriteResponse { + let keys: Vec = keys.iter().map(|v| (*v).into()).collect(); + // Build a batch without version. + let mut batch = new_write_batch_for_test(false); + let keys = new_delete_data(&keys); + batch.delete(keys).unwrap(); + + self.region.write(&self.write_ctx, batch).await.unwrap() + } } pub type FileTesterBase = TesterBase; @@ -141,6 +152,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { ("v0", LogicalTypeId::Int64, true), ], Some(0), + 2, ) } else { write_batch_util::new_write_batch( @@ -153,6 +165,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { ("v0", LogicalTypeId::Int64, true), ], Some(0), + 1, ) } } @@ -173,6 +186,20 @@ fn new_put_data(data: &[(TimestampMillisecond, Option)]) -> HashMap HashMap { + let mut delete_data = HashMap::new(); + + let timestamps = + TimestampMillisecondVector::from_vec(keys.iter().map(|v| v.0.into()).collect()); + + delete_data.insert( + test_util::TIMESTAMP_NAME.to_string(), + Arc::new(timestamps) as VectorRef, + ); + + delete_data +} + fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option)>) { assert_eq!(2, chunk.columns.len()); diff --git a/src/storage/src/region/tests/basic.rs b/src/storage/src/region/tests/basic.rs index c5352669a5..c2ccc830c4 100644 --- a/src/storage/src/region/tests/basic.rs +++ b/src/storage/src/region/tests/basic.rs @@ -106,6 +106,10 @@ impl Tester { fn committed_sequence(&self) -> SequenceNumber { self.base().committed_sequence() } + + async fn delete(&self, keys: &[i64]) -> WriteResponse { + self.base().delete(keys).await + } } #[tokio::test] @@ -202,3 +206,64 @@ async fn test_scan_different_batch() { assert_eq!(data, output); } } + +#[tokio::test] +async fn test_put_delete_scan() { + let dir = TempDir::new("put-delete-scan").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = Tester::new(REGION_NAME, store_dir).await; + + let data = vec![ + (1000, Some(100)), + (1001, Some(101)), + (1002, None), + (1003, None), + (1004, Some(104)), + ]; + + tester.put(&data).await; + + let keys = [1001, 1003]; + + tester.delete(&keys).await; + + let output = tester.full_scan().await; + let expect = vec![(1000, Some(100)), (1002, None), (1004, Some(104))]; + assert_eq!(expect, output); + + // Deletion is also persistent. + tester.try_reopen().await.unwrap(); + let output = tester.full_scan().await; + assert_eq!(expect, output); +} + +#[tokio::test] +async fn test_put_delete_absent_key() { + let dir = TempDir::new("put-delete-scan").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = Tester::new(REGION_NAME, store_dir).await; + + let data = vec![ + (1000, Some(100)), + (1001, Some(101)), + (1002, None), + (1003, None), + (1004, Some(104)), + ]; + + tester.put(&data).await; + + // 999 and 1006 is absent. + let keys = [999, 1002, 1004, 1006]; + + tester.delete(&keys).await; + + let output = tester.full_scan().await; + let expect = vec![(1000, Some(100)), (1001, Some(101)), (1003, None)]; + assert_eq!(expect, output); + + // Deletion is also persistent. + tester.try_reopen().await.unwrap(); + let output = tester.full_scan().await; + assert_eq!(expect, output); +} diff --git a/src/storage/src/region/tests/projection.rs b/src/storage/src/region/tests/projection.rs index 8a9536f5c2..837f418cdf 100644 --- a/src/storage/src/region/tests/projection.rs +++ b/src/storage/src/region/tests/projection.rs @@ -49,6 +49,7 @@ fn new_write_batch_for_test() -> WriteBatch { ("v1", LogicalTypeId::Int64, true), ], Some(1), + 2, ) } diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index f50d431433..26bf8632a8 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -18,9 +18,10 @@ use std::sync::Arc; use common_base::BitVec; use common_error::prelude::*; +use datatypes::prelude::ScalarVector; use datatypes::schema::{SchemaBuilder, SchemaRef}; -use datatypes::vectors::BooleanVector; -use store_api::storage::{Chunk, ColumnId}; +use datatypes::vectors::{BooleanVector, UInt8Vector}; +use store_api::storage::{Chunk, ColumnId, OpType}; use crate::error; use crate::metadata::{self, Result}; @@ -167,7 +168,9 @@ impl ProjectedSchema { /// Convert [Batch] into [Chunk]. /// /// This will remove all internal columns. The input `batch` should has the - /// same schema as `self.schema_to_read()`. + /// same schema as [`self.schema_to_read()`](ProjectedSchema::schema_to_read). + /// The output [Chunk] has the same schema as + /// [`self.projected_user_schema()`](ProjectedSchema::projected_user_schema). pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk { let columns = match &self.projection { Some(projection) => projection @@ -330,6 +333,29 @@ impl BatchOp for ProjectedSchema { Ok(Batch::new(columns)) } + + fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec) { + let op_types = batch.column(self.schema_to_read.op_type_index()); + // Safety: We expect the batch has the same schema as `self.schema_to_read`. The + // read procedure should guarantee this, otherwise this is a critical bug and it + // should be fine to panic. + let op_types = op_types + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!( + "Expect op_type (UInt8) column at index {}, given {:?}", + self.schema_to_read.op_type_index(), + op_types.data_type() + ); + }); + + for (i, op_type) in op_types.iter_data().enumerate() { + if op_type == Some(OpType::Delete.as_u8()) { + selected.set(i, false); + } + } + } } #[cfg(test)] @@ -526,4 +552,23 @@ mod tests { let expect: VectorRef = Arc::new(TimestampMillisecondVector::from_values([1000, 3000])); assert_eq!(expect, *res.column(0)); } + + #[test] + fn test_unselect_deleted() { + let schema = read_util::new_projected_schema(); + let batch = read_util::new_full_kv_batch(&[ + (100, 1, 1000, OpType::Put), + (101, 1, 999, OpType::Delete), + (102, 1, 1000, OpType::Put), + (103, 1, 999, OpType::Put), + (104, 1, 1000, OpType::Delete), + ]); + + let mut selected = BitVec::repeat(true, batch.num_rows()); + schema.unselect_deleted(&batch, &mut selected); + assert_eq!( + BitVec::from_iter([true, false, true, true, false]), + selected + ); + } } diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index 6dc86ef910..c91858d092 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -58,6 +58,7 @@ impl StoreSchema { self.schema.arrow_schema() } + // TODO(yingwen): Remove this method. pub fn batch_to_arrow_record_batch( &self, batch: &Batch, @@ -70,6 +71,14 @@ impl StoreSchema { .context(NewRecordBatchSnafu) } + /// Returns the ending index of row key columns. + /// + /// The ending index has the same value as the number of the row key columns. + #[inline] + pub fn row_key_end(&self) -> usize { + self.row_key_end + } + pub(crate) fn contains_column(&self, name: &str) -> bool { self.schema.column_schema_by_name(name).is_some() } @@ -166,11 +175,6 @@ impl StoreSchema { self.schema.num_columns() } - #[inline] - pub(crate) fn row_key_end(&self) -> usize { - self.row_key_end - } - #[inline] pub(crate) fn user_column_end(&self) -> usize { self.user_column_end diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 5bde4ac4e4..5d1531ab41 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -312,20 +312,19 @@ mod tests { // v1 assert_eq!( - &(Arc::new(UInt64Array::from(vec![1234, 1234, 1234, 1234, 1234, 1234])) - as Arc), + &(Arc::new(UInt64Array::from(vec![1234; 6])) as Arc), chunk.column(3) ); // sequence assert_eq!( - &(Arc::new(UInt64Array::from(vec![10, 10, 10, 10, 10, 10])) as Arc), + &(Arc::new(UInt64Array::from(vec![10; 6])) as Arc), chunk.column(4) ); // op_type assert_eq!( - &(Arc::new(UInt8Array::from(vec![0, 0, 0, 0, 0, 0])) as Arc), + &(Arc::new(UInt8Array::from(vec![1; 6])) as Arc), chunk.column(5) ); } diff --git a/src/storage/src/test_util/write_batch_util.rs b/src/storage/src/test_util/write_batch_util.rs index baace78039..41600612ef 100644 --- a/src/storage/src/test_util/write_batch_util.rs +++ b/src/storage/src/test_util/write_batch_util.rs @@ -15,8 +15,12 @@ use crate::test_util::schema_util::{self, ColumnDef}; use crate::write_batch::WriteBatch; -pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option) -> WriteBatch { +pub fn new_write_batch( + column_defs: &[ColumnDef], + timestamp_index: Option, + row_key_end: usize, +) -> WriteBatch { let schema = schema_util::new_schema_ref(column_defs, timestamp_index); - WriteBatch::new(schema) + WriteBatch::new(schema, row_key_end) } diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 575bd23808..83f54e293b 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -24,7 +24,10 @@ use store_api::logstore::{AppendResponse, LogStore}; use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; -use crate::error::{self, Error, MarkWalStableSnafu, Result}; +use crate::error::{ + DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalStableSnafu, ReadWalSnafu, Result, + WalDataCorruptedSnafu, WriteWalSnafu, +}; use crate::proto::wal::{self, WalHeader}; use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder}; use crate::write_batch::Payload; @@ -114,7 +117,7 @@ impl Wal { encoder .encode(p, &mut buf) .map_err(BoxedError::new) - .context(error::WriteWalSnafu { + .context(WriteWalSnafu { region_id: self.region_id(), })?; } @@ -129,7 +132,7 @@ impl Wal { .read(&self.namespace, start_seq) .await .map_err(BoxedError::new) - .context(error::ReadWalSnafu { + .context(ReadWalSnafu { region_id: self.region_id(), })? // Handle the error when reading from the stream. @@ -155,7 +158,7 @@ impl Wal { .append(e) .await .map_err(BoxedError::new) - .context(error::WriteWalSnafu { + .context(WriteWalSnafu { region_id: self.region_id(), })?; @@ -174,7 +177,7 @@ impl Wal { ensure!( data_pos <= input.len(), - error::WalDataCorruptedSnafu { + WalDataCorruptedSnafu { region_id: self.region_id(), message: format!( "Not enough input buffer, expected data position={}, actual buffer length={}", @@ -192,7 +195,7 @@ impl Wal { let payload = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) - .context(error::ReadWalSnafu { + .context(ReadWalSnafu { region_id: self.region_id(), })?; @@ -209,7 +212,7 @@ impl Encoder for WalHeaderEncoder { fn encode(&self, item: &WalHeader, dst: &mut Vec) -> Result<()> { item.encode_length_delimited(dst) .map_err(|err| err.into()) - .context(error::EncodeWalHeaderSnafu) + .context(EncodeWalHeaderSnafu) } } @@ -222,12 +225,12 @@ impl Decoder for WalHeaderDecoder { fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> { let mut data_pos = prost::decode_length_delimiter(src) .map_err(|err| err.into()) - .context(error::DecodeWalHeaderSnafu)?; + .context(DecodeWalHeaderSnafu)?; data_pos += prost::length_delimiter_len(data_pos); let wal_header = WalHeader::decode_length_delimited(src) .map_err(|err| err.into()) - .context(error::DecodeWalHeaderSnafu)?; + .context(DecodeWalHeaderSnafu)?; Ok((data_pos, wal_header)) } diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 49eba13101..e43051470f 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -18,17 +18,20 @@ mod compat; use std::collections::HashMap; use common_recordbatch::RecordBatch; +use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{OpType, WriteRequest}; use crate::error::{ BatchMissingColumnSnafu, CreateDefaultSnafu, CreateRecordBatchSnafu, Error, HasNullSnafu, - LenNotEqualsSnafu, RequestTooLargeSnafu, Result, TypeMismatchSnafu, UnknownColumnSnafu, + MoreColumnThanExpectedSnafu, RequestTooLargeSnafu, Result, TypeMismatchSnafu, + UnequalLengthsSnafu, UnknownColumnSnafu, }; -/// Max number of updates of a write batch. +/// Max number of updates in a write batch. pub(crate) const MAX_BATCH_SIZE: usize = 1_000_000; /// Data of [WriteBatch]. @@ -77,6 +80,11 @@ pub struct WriteBatch { /// /// We use it to check whether this batch is too large. num_rows_to_mutate: usize, + /// The ending index of row key columns. + /// + /// The `WriteBatch` use this index to locate all row key columns from + /// the schema. + row_key_end: usize, } impl WriteRequest for WriteBatch { @@ -98,14 +106,41 @@ impl WriteRequest for WriteBatch { Ok(()) } + + fn delete(&mut self, keys: HashMap) -> Result<()> { + let data = NameToVector::new(keys)?; + if data.is_empty() { + return Ok(()); + } + + let record_batch = self.process_delete_data(data)?; + + self.add_num_rows_to_mutate(record_batch.num_rows())?; + self.payload.mutations.push(Mutation { + op_type: OpType::Delete, + record_batch, + }); + + Ok(()) + } } // WriteBatch pub methods. impl WriteBatch { - pub fn new(schema: SchemaRef) -> Self { + /// Creates a new `WriteBatch`. + /// + /// The `schema` is the user schema of the region (no internal columns) and + /// the `row_key_end` is the ending index of row key columns. + /// + /// # Panics + /// Panics if `row_key_end <= schema.num_columns()`. + pub fn new(schema: SchemaRef, row_key_end: usize) -> Self { + assert!(row_key_end <= schema.num_columns()); + Self { payload: Payload::new(schema), num_rows_to_mutate: 0, + row_key_end, } } @@ -154,6 +189,46 @@ impl WriteBatch { RecordBatch::new(self.schema().clone(), columns).context(CreateRecordBatchSnafu) } + /// Validates `data` and converts it into a [RecordBatch]. + /// + /// It fills value columns by null, ignoring whether the column is nullable as the contents + /// of value columns won't be read. + fn process_delete_data(&self, data: NameToVector) -> Result { + // Ensure row key columns are provided. + for column_schema in self.row_key_column_schemas() { + ensure!( + data.0.contains_key(&column_schema.name), + BatchMissingColumnSnafu { + column: &column_schema.name, + } + ); + } + // Ensure only provides row key columns. + ensure!( + data.0.len() == self.row_key_column_schemas().len(), + MoreColumnThanExpectedSnafu + ); + + let num_rows = data.num_rows(); + let mut columns = Vec::with_capacity(self.schema().num_columns()); + for column_schema in self.schema().column_schemas() { + match data.0.get(&column_schema.name) { + Some(col) => { + validate_column(column_schema, col)?; + columns.push(col.clone()); + } + None => { + // Fills value columns by null, these columns are just placeholders to ensure + // the schema of the record batch is correct. + let col = new_column_with_null(&column_schema.data_type, num_rows); + columns.push(col); + } + } + } + + RecordBatch::new(self.schema().clone(), columns).context(CreateRecordBatchSnafu) + } + fn add_num_rows_to_mutate(&mut self, len: usize) -> Result<()> { let num_rows = self.num_rows_to_mutate + len; ensure!( @@ -163,6 +238,11 @@ impl WriteBatch { self.num_rows_to_mutate = num_rows; Ok(()) } + + /// Returns all row key columns in the schema. + fn row_key_column_schemas(&self) -> &[ColumnSchema] { + &self.payload.schema.column_schemas()[..self.row_key_end] + } } /// Returns the length of the first vector in `data`. @@ -218,6 +298,17 @@ pub(crate) fn new_column_with_default_value( Ok(vector) } +/// Creates a new column and fills it by null. +fn new_column_with_null(data_type: &ConcreteDataType, num_rows: usize) -> VectorRef { + // TODO(yingwen): Use `NullVector` once it supports setting logical type. + let mut mutable_vector = data_type.create_mutable_vector(num_rows); + for _ in 0..num_rows { + // Safety: push null is safe. + mutable_vector.push_value_ref(ValueRef::Null).unwrap(); + } + mutable_vector.to_vector() +} + /// Vectors in [NameToVector] have same length. /// /// MUST construct it via [`NameToVector::new()`] to ensure the vector lengths are validated. @@ -229,7 +320,7 @@ impl NameToVector { for (name, vector) in &data { ensure!( num_rows == vector.len(), - LenNotEqualsSnafu { + UnequalLengthsSnafu { name, expect: num_rows, given: vector.len(), @@ -264,6 +355,7 @@ pub(crate) fn new_test_batch() -> WriteBatch { ("v1", LogicalTypeId::Boolean, true), ], Some(2), + 3, ) } @@ -325,7 +417,6 @@ mod tests { put_data.insert("ts".to_string(), tsv); let mut batch = new_test_batch(); - assert!(batch.payload().is_empty()); batch.put(put_data).unwrap(); assert!(!batch.payload().is_empty()); @@ -352,7 +443,7 @@ mod tests { put_data.insert("k1".to_string(), boolv); let mut batch = - write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)], None); + write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)], None, 1); let err = batch.put(put_data).unwrap_err(); check_err(err, "Request is too large"); } @@ -433,4 +524,82 @@ mod tests { let err = batch.put(put_data).unwrap_err(); assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); } + + #[test] + fn test_put_empty() { + let mut batch = new_test_batch(); + batch.put(HashMap::new()).unwrap(); + assert!(batch.payload().is_empty()); + } + + #[test] + fn test_delete_empty() { + let mut batch = new_test_batch(); + batch.delete(HashMap::new()).unwrap(); + assert!(batch.payload().is_empty()); + } + + #[test] + fn test_write_batch_delete() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef; + let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef; + + let mut keys = HashMap::with_capacity(3); + keys.insert("k1".to_string(), intv.clone()); + keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv); + keys.insert("ts".to_string(), tsv); + + let mut batch = new_test_batch(); + batch.delete(keys).unwrap(); + + let record_batch = &batch.payload().mutations[0].record_batch; + assert_eq!(3, record_batch.num_rows()); + assert_eq!(4, record_batch.num_columns()); + let v1 = record_batch.column_by_name("v1").unwrap(); + assert!(v1.only_null()); + } + + #[test] + fn test_delete_missing_column() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef; + + let mut keys = HashMap::with_capacity(3); + keys.insert("k1".to_string(), intv.clone()); + keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv); + + let mut batch = new_test_batch(); + let err = batch.delete(keys).unwrap_err(); + check_err(err, "Missing column ts"); + } + + #[test] + fn test_delete_columns_more_than_row_key() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef; + let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef; + + let mut keys = HashMap::with_capacity(3); + keys.insert("k1".to_string(), intv.clone()); + keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv.clone()); + keys.insert("ts".to_string(), tsv); + keys.insert("v2".to_string(), intv); + + let mut batch = new_test_batch(); + let err = batch.delete(keys).unwrap_err(); + check_err(err, "More columns than expected"); + } + + #[test] + fn test_delete_type_mismatch() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef; + let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef; + + let mut keys = HashMap::with_capacity(3); + keys.insert("k1".to_string(), intv.clone()); + keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv); + keys.insert("ts".to_string(), boolv); + + let mut batch = new_test_batch(); + let err = batch.delete(keys).unwrap_err(); + check_err(err, "Type of column ts does not match"); + } } diff --git a/src/storage/src/write_batch/codec.rs b/src/storage/src/write_batch/codec.rs index 961d0d09ac..74ea9a50e2 100644 --- a/src/storage/src/write_batch/codec.rs +++ b/src/storage/src/write_batch/codec.rs @@ -89,9 +89,7 @@ impl<'a> Decoder for PayloadDecoder<'a> { let record_batch = RecordBatch::try_from_df_record_batch(schema.clone(), record_batch) .context(CreateRecordBatchSnafu)?; let op_type = match MutationType::from_i32(*mutation_type) { - Some(MutationType::Delete) => { - unimplemented!("delete mutation is not implemented") - } + Some(MutationType::Delete) => OpType::Delete, Some(MutationType::Put) => OpType::Put, None => { return BatchCorruptedSnafu { diff --git a/src/storage/src/write_batch/compat.rs b/src/storage/src/write_batch/compat.rs index b7089825d3..dbb2bff4de 100644 --- a/src/storage/src/write_batch/compat.rs +++ b/src/storage/src/write_batch/compat.rs @@ -110,6 +110,9 @@ mod tests { use super::*; use crate::error::Error; + // Test schema only has two row key columns: k0, ts. + const TEST_ROW_KEY_END: usize = 2; + fn new_test_schema_builder( v0_constraint: Option>, ) -> SchemaBuilder { @@ -158,7 +161,7 @@ mod tests { // Mutation doesn't check schema version, so we don't have to bump the version here. let schema = new_test_schema(Some(Some(ColumnDefaultConstraint::null_value()))); // Use WriteBatch to build a payload and its mutation. - let mut batch = WriteBatch::new(schema_old); + let mut batch = WriteBatch::new(schema_old, TEST_ROW_KEY_END); batch.put(put_data).unwrap(); let mutation = &mut batch.payload.mutations[0]; @@ -171,7 +174,7 @@ mod tests { #[test] fn test_write_batch_compat_write() { let schema_old = new_test_schema(None); - let mut batch = WriteBatch::new(schema_old); + let mut batch = WriteBatch::new(schema_old, TEST_ROW_KEY_END); let put_data = new_put_data(); batch.put(put_data).unwrap(); @@ -198,7 +201,7 @@ mod tests { .unwrap(), ); - let mut batch = WriteBatch::new(schema_new); + let mut batch = WriteBatch::new(schema_new, TEST_ROW_KEY_END); let err = batch.compat_write(&schema_old).unwrap_err(); assert!( matches!(err, Error::WriteToOldVersion { .. }), @@ -209,14 +212,14 @@ mod tests { #[test] fn test_write_batch_skip_compat() { let schema = new_test_schema(None); - let mut batch = WriteBatch::new(schema.clone()); + let mut batch = WriteBatch::new(schema.clone(), TEST_ROW_KEY_END); batch.compat_write(&schema).unwrap(); } #[test] fn test_write_batch_compat_columns_not_in_schema() { let schema_has_column = new_test_schema(Some(None)); - let mut batch = WriteBatch::new(schema_has_column); + let mut batch = WriteBatch::new(schema_has_column, TEST_ROW_KEY_END); let schema_no_column = Arc::new(new_test_schema_builder(None).version(1).build().unwrap()); let err = batch.compat_write(&schema_no_column).unwrap_err(); diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 7d9870f0c9..7df6c88be0 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -31,6 +31,11 @@ pub trait WriteRequest: Send { /// /// `data` is the columnar format of the data to put. fn put(&mut self, data: HashMap) -> Result<(), Self::Error>; + + /// Delete rows by `keys`. + /// + /// `keys` are the row keys, in columnar format, of the rows to delete. + fn delete(&mut self, keys: HashMap) -> Result<(), Self::Error>; } #[derive(Default)] diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index ecca8e4f64..ed8d3c4222 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -19,20 +19,27 @@ pub type SequenceNumber = u64; /// Operation type of the value to write to storage. +/// +/// The enum values are stored in the SST files so don't change +/// them if possible. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub enum OpType { + /// Delete operation. + Delete = 0, /// Put operation. - Put, + Put = 1, } impl OpType { + /// Cast the [OpType] to u8. + #[inline] pub fn as_u8(&self) -> u8 { *self as u8 } /// Minimal op type after casting to u8. pub const fn min_type() -> OpType { - OpType::Put + OpType::Delete } } @@ -42,7 +49,8 @@ mod tests { #[test] fn test_op_type() { - assert_eq!(0, OpType::Put.as_u8()); + assert_eq!(0, OpType::Delete.as_u8()); + assert_eq!(1, OpType::Put.as_u8()); assert_eq!(0, OpType::min_type().as_u8()); } }