mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: Implement delete for the storage engine (#777)
* docs: Fix incorrect comment of Vector::only_null * feat: Add delete to WriteRequest and WriteBatch * feat: Filter deleted rows * fix: Fix panic after reopening engine This is detected by adding a reopen step to the delete test for region. * fix: Fix OpType::min_type() * test: Add delete absent key test * chore: Address CR comments
This commit is contained in:
@@ -111,7 +111,7 @@ pub trait Vector: Send + Sync + Serializable + Debug + VectorOp {
|
||||
/// Returns whether row is null.
|
||||
fn is_null(&self, row: usize) -> bool;
|
||||
|
||||
/// If the only value vector can contain is NULL.
|
||||
/// If the vector only contains NULL.
|
||||
fn only_null(&self) -> bool {
|
||||
self.null_count() == self.len()
|
||||
}
|
||||
|
||||
@@ -27,8 +27,8 @@ use storage::metadata::{RegionMetaImpl, RegionMetadata};
|
||||
use storage::write_batch::WriteBatch;
|
||||
use store_api::storage::{
|
||||
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse,
|
||||
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest,
|
||||
ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
|
||||
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, ScanResponse,
|
||||
SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
|
||||
};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, MockError>;
|
||||
@@ -173,7 +173,11 @@ impl Region for MockRegion {
|
||||
}
|
||||
|
||||
fn write_request(&self) -> WriteBatch {
|
||||
WriteBatch::new(self.in_memory_metadata().schema().clone())
|
||||
let metadata = self.inner.metadata.load();
|
||||
let user_schema = metadata.user_schema().clone();
|
||||
let row_key_end = metadata.schema().store_schema().row_key_end();
|
||||
|
||||
WriteBatch::new(user_schema, row_key_end)
|
||||
}
|
||||
|
||||
async fn alter(&self, request: AlterRequest) -> Result<()> {
|
||||
|
||||
@@ -43,6 +43,7 @@ pub fn new_test_batch() -> WriteBatch {
|
||||
("10", LogicalTypeId::String, false),
|
||||
],
|
||||
Some(2),
|
||||
3,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,8 +16,12 @@ use storage::write_batch::WriteBatch;
|
||||
|
||||
use crate::memtable::util::schema_util::{self, ColumnDef};
|
||||
|
||||
pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> WriteBatch {
|
||||
pub fn new_write_batch(
|
||||
column_defs: &[ColumnDef],
|
||||
timestamp_index: Option<usize>,
|
||||
row_key_end: usize,
|
||||
) -> WriteBatch {
|
||||
let schema = schema_util::new_schema_ref(column_defs, timestamp_index);
|
||||
|
||||
WriteBatch::new(schema)
|
||||
WriteBatch::new(schema, row_key_end)
|
||||
}
|
||||
|
||||
@@ -404,7 +404,7 @@ pub enum Error {
|
||||
expect,
|
||||
given
|
||||
))]
|
||||
LenNotEquals {
|
||||
UnequalLengths {
|
||||
name: String,
|
||||
expect: usize,
|
||||
given: usize,
|
||||
@@ -434,6 +434,9 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("More columns than expected in the request"))]
|
||||
MoreColumnThanExpected { backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -455,7 +458,8 @@ impl ErrorExt for Error {
|
||||
| RequestTooLarge { .. }
|
||||
| TypeMismatch { .. }
|
||||
| HasNull { .. }
|
||||
| LenNotEquals { .. } => StatusCode::InvalidArguments,
|
||||
| UnequalLengths { .. }
|
||||
| MoreColumnThanExpected { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Utf8 { .. }
|
||||
| EncodeJson { .. }
|
||||
|
||||
@@ -143,6 +143,7 @@ mod tests {
|
||||
("value", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(0),
|
||||
1,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -583,7 +583,7 @@ fn test_memtable_projection() {
|
||||
let k1 = Arc::new(UInt64Vector::from_slice(&[0, 1, 2])) as VectorRef;
|
||||
let v0 = Arc::new(UInt64Vector::from_slice(&[10, 11, 12])) as VectorRef;
|
||||
let sequences = Arc::new(UInt64Vector::from_slice(&[9, 9, 9])) as VectorRef;
|
||||
let op_types = Arc::new(UInt8Vector::from_slice(&[0, 0, 0])) as VectorRef;
|
||||
let op_types = Arc::new(UInt8Vector::from_slice(&[1, 1, 1])) as VectorRef;
|
||||
|
||||
assert_eq!(k0, *batch.column(0));
|
||||
assert_eq!(k1, *batch.column(1));
|
||||
|
||||
@@ -24,6 +24,7 @@ pub fn gen_mutation_types(payload: &Payload) -> Vec<i32> {
|
||||
.mutations
|
||||
.iter()
|
||||
.map(|m| match m.op_type {
|
||||
OpType::Delete => MutationType::Delete.into(),
|
||||
OpType::Put => MutationType::Put.into(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
|
||||
@@ -135,6 +135,14 @@ pub trait BatchOp {
|
||||
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements
|
||||
/// being masked out.
|
||||
fn filter(&self, batch: &Batch, filter: &BooleanVector) -> Result<Batch>;
|
||||
|
||||
/// Unselect deleted rows according to the [`OpType`](store_api::storage::OpType).
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if
|
||||
/// - `batch` doesn't have a valid op type column.
|
||||
/// - `selected.len()` is less than the number of rows.
|
||||
fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec);
|
||||
}
|
||||
|
||||
/// Reusable [Batch] builder.
|
||||
|
||||
@@ -66,8 +66,8 @@ impl<R> DedupReader<R> {
|
||||
.get_or_insert_with(Batch::default)
|
||||
.clone_from(&batch); // Use `clone_from` to reuse allocated memory if possible.
|
||||
|
||||
// TODO(yingwen): To support `DELETE`, we could find all rows whose op_types are equal
|
||||
// to `OpType::Delete`, mark their `selected` to false, then filter the batch.
|
||||
// Find all rows whose op_types are `OpType::Delete`, mark their `selected` to false.
|
||||
self.schema.unselect_deleted(&batch, &mut self.selected);
|
||||
|
||||
let filter = BooleanVector::from_iterator(self.selected.iter().by_vals());
|
||||
// Filter duplicate rows.
|
||||
|
||||
@@ -24,8 +24,8 @@ use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
use store_api::storage::{
|
||||
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber,
|
||||
WriteContext, WriteResponse,
|
||||
AlterRequest, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext,
|
||||
WriteResponse,
|
||||
};
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
@@ -91,7 +91,11 @@ impl<S: LogStore> Region for RegionImpl<S> {
|
||||
}
|
||||
|
||||
fn write_request(&self) -> Self::WriteRequest {
|
||||
WriteBatch::new(self.in_memory_metadata().schema().clone())
|
||||
let metadata = self.inner.version_control().metadata();
|
||||
let user_schema = metadata.user_schema().clone();
|
||||
let row_key_end = metadata.schema().store_schema().row_key_end();
|
||||
|
||||
WriteBatch::new(user_schema, row_key_end)
|
||||
}
|
||||
|
||||
async fn alter(&self, request: AlterRequest) -> Result<()> {
|
||||
|
||||
@@ -31,7 +31,7 @@ use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::backend::fs;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::storage::{
|
||||
consts, Chunk, ChunkReader, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
|
||||
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
|
||||
};
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -124,6 +124,17 @@ impl<S: LogStore> TesterBase<S> {
|
||||
pub fn committed_sequence(&self) -> SequenceNumber {
|
||||
self.region.committed_sequence()
|
||||
}
|
||||
|
||||
/// Delete by keys (timestamp).
|
||||
pub async fn delete(&self, keys: &[i64]) -> WriteResponse {
|
||||
let keys: Vec<TimestampMillisecond> = keys.iter().map(|v| (*v).into()).collect();
|
||||
// Build a batch without version.
|
||||
let mut batch = new_write_batch_for_test(false);
|
||||
let keys = new_delete_data(&keys);
|
||||
batch.delete(keys).unwrap();
|
||||
|
||||
self.region.write(&self.write_ctx, batch).await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub type FileTesterBase = TesterBase<LocalFileLogStore>;
|
||||
@@ -141,6 +152,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(0),
|
||||
2,
|
||||
)
|
||||
} else {
|
||||
write_batch_util::new_write_batch(
|
||||
@@ -153,6 +165,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(0),
|
||||
1,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -173,6 +186,20 @@ fn new_put_data(data: &[(TimestampMillisecond, Option<i64>)]) -> HashMap<String,
|
||||
put_data
|
||||
}
|
||||
|
||||
fn new_delete_data(keys: &[TimestampMillisecond]) -> HashMap<String, VectorRef> {
|
||||
let mut delete_data = HashMap::new();
|
||||
|
||||
let timestamps =
|
||||
TimestampMillisecondVector::from_vec(keys.iter().map(|v| v.0.into()).collect());
|
||||
|
||||
delete_data.insert(
|
||||
test_util::TIMESTAMP_NAME.to_string(),
|
||||
Arc::new(timestamps) as VectorRef,
|
||||
);
|
||||
|
||||
delete_data
|
||||
}
|
||||
|
||||
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
|
||||
assert_eq!(2, chunk.columns.len());
|
||||
|
||||
|
||||
@@ -106,6 +106,10 @@ impl Tester {
|
||||
fn committed_sequence(&self) -> SequenceNumber {
|
||||
self.base().committed_sequence()
|
||||
}
|
||||
|
||||
async fn delete(&self, keys: &[i64]) -> WriteResponse {
|
||||
self.base().delete(keys).await
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -202,3 +206,64 @@ async fn test_scan_different_batch() {
|
||||
assert_eq!(data, output);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_delete_scan() {
|
||||
let dir = TempDir::new("put-delete-scan").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut tester = Tester::new(REGION_NAME, store_dir).await;
|
||||
|
||||
let data = vec![
|
||||
(1000, Some(100)),
|
||||
(1001, Some(101)),
|
||||
(1002, None),
|
||||
(1003, None),
|
||||
(1004, Some(104)),
|
||||
];
|
||||
|
||||
tester.put(&data).await;
|
||||
|
||||
let keys = [1001, 1003];
|
||||
|
||||
tester.delete(&keys).await;
|
||||
|
||||
let output = tester.full_scan().await;
|
||||
let expect = vec![(1000, Some(100)), (1002, None), (1004, Some(104))];
|
||||
assert_eq!(expect, output);
|
||||
|
||||
// Deletion is also persistent.
|
||||
tester.try_reopen().await.unwrap();
|
||||
let output = tester.full_scan().await;
|
||||
assert_eq!(expect, output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_delete_absent_key() {
|
||||
let dir = TempDir::new("put-delete-scan").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut tester = Tester::new(REGION_NAME, store_dir).await;
|
||||
|
||||
let data = vec![
|
||||
(1000, Some(100)),
|
||||
(1001, Some(101)),
|
||||
(1002, None),
|
||||
(1003, None),
|
||||
(1004, Some(104)),
|
||||
];
|
||||
|
||||
tester.put(&data).await;
|
||||
|
||||
// 999 and 1006 is absent.
|
||||
let keys = [999, 1002, 1004, 1006];
|
||||
|
||||
tester.delete(&keys).await;
|
||||
|
||||
let output = tester.full_scan().await;
|
||||
let expect = vec![(1000, Some(100)), (1001, Some(101)), (1003, None)];
|
||||
assert_eq!(expect, output);
|
||||
|
||||
// Deletion is also persistent.
|
||||
tester.try_reopen().await.unwrap();
|
||||
let output = tester.full_scan().await;
|
||||
assert_eq!(expect, output);
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ fn new_write_batch_for_test() -> WriteBatch {
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(1),
|
||||
2,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,9 +18,10 @@ use std::sync::Arc;
|
||||
|
||||
use common_base::BitVec;
|
||||
use common_error::prelude::*;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::schema::{SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::BooleanVector;
|
||||
use store_api::storage::{Chunk, ColumnId};
|
||||
use datatypes::vectors::{BooleanVector, UInt8Vector};
|
||||
use store_api::storage::{Chunk, ColumnId, OpType};
|
||||
|
||||
use crate::error;
|
||||
use crate::metadata::{self, Result};
|
||||
@@ -167,7 +168,9 @@ impl ProjectedSchema {
|
||||
/// Convert [Batch] into [Chunk].
|
||||
///
|
||||
/// This will remove all internal columns. The input `batch` should has the
|
||||
/// same schema as `self.schema_to_read()`.
|
||||
/// same schema as [`self.schema_to_read()`](ProjectedSchema::schema_to_read).
|
||||
/// The output [Chunk] has the same schema as
|
||||
/// [`self.projected_user_schema()`](ProjectedSchema::projected_user_schema).
|
||||
pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk {
|
||||
let columns = match &self.projection {
|
||||
Some(projection) => projection
|
||||
@@ -330,6 +333,29 @@ impl BatchOp for ProjectedSchema {
|
||||
|
||||
Ok(Batch::new(columns))
|
||||
}
|
||||
|
||||
fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec) {
|
||||
let op_types = batch.column(self.schema_to_read.op_type_index());
|
||||
// Safety: We expect the batch has the same schema as `self.schema_to_read`. The
|
||||
// read procedure should guarantee this, otherwise this is a critical bug and it
|
||||
// should be fine to panic.
|
||||
let op_types = op_types
|
||||
.as_any()
|
||||
.downcast_ref::<UInt8Vector>()
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Expect op_type (UInt8) column at index {}, given {:?}",
|
||||
self.schema_to_read.op_type_index(),
|
||||
op_types.data_type()
|
||||
);
|
||||
});
|
||||
|
||||
for (i, op_type) in op_types.iter_data().enumerate() {
|
||||
if op_type == Some(OpType::Delete.as_u8()) {
|
||||
selected.set(i, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -526,4 +552,23 @@ mod tests {
|
||||
let expect: VectorRef = Arc::new(TimestampMillisecondVector::from_values([1000, 3000]));
|
||||
assert_eq!(expect, *res.column(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unselect_deleted() {
|
||||
let schema = read_util::new_projected_schema();
|
||||
let batch = read_util::new_full_kv_batch(&[
|
||||
(100, 1, 1000, OpType::Put),
|
||||
(101, 1, 999, OpType::Delete),
|
||||
(102, 1, 1000, OpType::Put),
|
||||
(103, 1, 999, OpType::Put),
|
||||
(104, 1, 1000, OpType::Delete),
|
||||
]);
|
||||
|
||||
let mut selected = BitVec::repeat(true, batch.num_rows());
|
||||
schema.unselect_deleted(&batch, &mut selected);
|
||||
assert_eq!(
|
||||
BitVec::from_iter([true, false, true, true, false]),
|
||||
selected
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ impl StoreSchema {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
// TODO(yingwen): Remove this method.
|
||||
pub fn batch_to_arrow_record_batch(
|
||||
&self,
|
||||
batch: &Batch,
|
||||
@@ -70,6 +71,14 @@ impl StoreSchema {
|
||||
.context(NewRecordBatchSnafu)
|
||||
}
|
||||
|
||||
/// Returns the ending index of row key columns.
|
||||
///
|
||||
/// The ending index has the same value as the number of the row key columns.
|
||||
#[inline]
|
||||
pub fn row_key_end(&self) -> usize {
|
||||
self.row_key_end
|
||||
}
|
||||
|
||||
pub(crate) fn contains_column(&self, name: &str) -> bool {
|
||||
self.schema.column_schema_by_name(name).is_some()
|
||||
}
|
||||
@@ -166,11 +175,6 @@ impl StoreSchema {
|
||||
self.schema.num_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn row_key_end(&self) -> usize {
|
||||
self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn user_column_end(&self) -> usize {
|
||||
self.user_column_end
|
||||
|
||||
@@ -312,20 +312,19 @@ mod tests {
|
||||
|
||||
// v1
|
||||
assert_eq!(
|
||||
&(Arc::new(UInt64Array::from(vec![1234, 1234, 1234, 1234, 1234, 1234]))
|
||||
as Arc<dyn Array>),
|
||||
&(Arc::new(UInt64Array::from(vec![1234; 6])) as Arc<dyn Array>),
|
||||
chunk.column(3)
|
||||
);
|
||||
|
||||
// sequence
|
||||
assert_eq!(
|
||||
&(Arc::new(UInt64Array::from(vec![10, 10, 10, 10, 10, 10])) as Arc<dyn Array>),
|
||||
&(Arc::new(UInt64Array::from(vec![10; 6])) as Arc<dyn Array>),
|
||||
chunk.column(4)
|
||||
);
|
||||
|
||||
// op_type
|
||||
assert_eq!(
|
||||
&(Arc::new(UInt8Array::from(vec![0, 0, 0, 0, 0, 0])) as Arc<dyn Array>),
|
||||
&(Arc::new(UInt8Array::from(vec![1; 6])) as Arc<dyn Array>),
|
||||
chunk.column(5)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -15,8 +15,12 @@
|
||||
use crate::test_util::schema_util::{self, ColumnDef};
|
||||
use crate::write_batch::WriteBatch;
|
||||
|
||||
pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> WriteBatch {
|
||||
pub fn new_write_batch(
|
||||
column_defs: &[ColumnDef],
|
||||
timestamp_index: Option<usize>,
|
||||
row_key_end: usize,
|
||||
) -> WriteBatch {
|
||||
let schema = schema_util::new_schema_ref(column_defs, timestamp_index);
|
||||
|
||||
WriteBatch::new(schema)
|
||||
WriteBatch::new(schema, row_key_end)
|
||||
}
|
||||
|
||||
@@ -24,7 +24,10 @@ use store_api::logstore::{AppendResponse, LogStore};
|
||||
use store_api::storage::{RegionId, SequenceNumber};
|
||||
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::error::{self, Error, MarkWalStableSnafu, Result};
|
||||
use crate::error::{
|
||||
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalStableSnafu, ReadWalSnafu, Result,
|
||||
WalDataCorruptedSnafu, WriteWalSnafu,
|
||||
};
|
||||
use crate::proto::wal::{self, WalHeader};
|
||||
use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder};
|
||||
use crate::write_batch::Payload;
|
||||
@@ -114,7 +117,7 @@ impl<S: LogStore> Wal<S> {
|
||||
encoder
|
||||
.encode(p, &mut buf)
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::WriteWalSnafu {
|
||||
.context(WriteWalSnafu {
|
||||
region_id: self.region_id(),
|
||||
})?;
|
||||
}
|
||||
@@ -129,7 +132,7 @@ impl<S: LogStore> Wal<S> {
|
||||
.read(&self.namespace, start_seq)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ReadWalSnafu {
|
||||
.context(ReadWalSnafu {
|
||||
region_id: self.region_id(),
|
||||
})?
|
||||
// Handle the error when reading from the stream.
|
||||
@@ -155,7 +158,7 @@ impl<S: LogStore> Wal<S> {
|
||||
.append(e)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::WriteWalSnafu {
|
||||
.context(WriteWalSnafu {
|
||||
region_id: self.region_id(),
|
||||
})?;
|
||||
|
||||
@@ -174,7 +177,7 @@ impl<S: LogStore> Wal<S> {
|
||||
|
||||
ensure!(
|
||||
data_pos <= input.len(),
|
||||
error::WalDataCorruptedSnafu {
|
||||
WalDataCorruptedSnafu {
|
||||
region_id: self.region_id(),
|
||||
message: format!(
|
||||
"Not enough input buffer, expected data position={}, actual buffer length={}",
|
||||
@@ -192,7 +195,7 @@ impl<S: LogStore> Wal<S> {
|
||||
let payload = decoder
|
||||
.decode(&input[data_pos..])
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ReadWalSnafu {
|
||||
.context(ReadWalSnafu {
|
||||
region_id: self.region_id(),
|
||||
})?;
|
||||
|
||||
@@ -209,7 +212,7 @@ impl Encoder for WalHeaderEncoder {
|
||||
fn encode(&self, item: &WalHeader, dst: &mut Vec<u8>) -> Result<()> {
|
||||
item.encode_length_delimited(dst)
|
||||
.map_err(|err| err.into())
|
||||
.context(error::EncodeWalHeaderSnafu)
|
||||
.context(EncodeWalHeaderSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,12 +225,12 @@ impl Decoder for WalHeaderDecoder {
|
||||
fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> {
|
||||
let mut data_pos = prost::decode_length_delimiter(src)
|
||||
.map_err(|err| err.into())
|
||||
.context(error::DecodeWalHeaderSnafu)?;
|
||||
.context(DecodeWalHeaderSnafu)?;
|
||||
data_pos += prost::length_delimiter_len(data_pos);
|
||||
|
||||
let wal_header = WalHeader::decode_length_delimited(src)
|
||||
.map_err(|err| err.into())
|
||||
.context(error::DecodeWalHeaderSnafu)?;
|
||||
.context(DecodeWalHeaderSnafu)?;
|
||||
|
||||
Ok((data_pos, wal_header))
|
||||
}
|
||||
|
||||
@@ -18,17 +18,20 @@ mod compat;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::data_type::{ConcreteDataType, DataType};
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{OpType, WriteRequest};
|
||||
|
||||
use crate::error::{
|
||||
BatchMissingColumnSnafu, CreateDefaultSnafu, CreateRecordBatchSnafu, Error, HasNullSnafu,
|
||||
LenNotEqualsSnafu, RequestTooLargeSnafu, Result, TypeMismatchSnafu, UnknownColumnSnafu,
|
||||
MoreColumnThanExpectedSnafu, RequestTooLargeSnafu, Result, TypeMismatchSnafu,
|
||||
UnequalLengthsSnafu, UnknownColumnSnafu,
|
||||
};
|
||||
|
||||
/// Max number of updates of a write batch.
|
||||
/// Max number of updates in a write batch.
|
||||
pub(crate) const MAX_BATCH_SIZE: usize = 1_000_000;
|
||||
|
||||
/// Data of [WriteBatch].
|
||||
@@ -77,6 +80,11 @@ pub struct WriteBatch {
|
||||
///
|
||||
/// We use it to check whether this batch is too large.
|
||||
num_rows_to_mutate: usize,
|
||||
/// The ending index of row key columns.
|
||||
///
|
||||
/// The `WriteBatch` use this index to locate all row key columns from
|
||||
/// the schema.
|
||||
row_key_end: usize,
|
||||
}
|
||||
|
||||
impl WriteRequest for WriteBatch {
|
||||
@@ -98,14 +106,41 @@ impl WriteRequest for WriteBatch {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&mut self, keys: HashMap<String, VectorRef>) -> Result<()> {
|
||||
let data = NameToVector::new(keys)?;
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let record_batch = self.process_delete_data(data)?;
|
||||
|
||||
self.add_num_rows_to_mutate(record_batch.num_rows())?;
|
||||
self.payload.mutations.push(Mutation {
|
||||
op_type: OpType::Delete,
|
||||
record_batch,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// WriteBatch pub methods.
|
||||
impl WriteBatch {
|
||||
pub fn new(schema: SchemaRef) -> Self {
|
||||
/// Creates a new `WriteBatch`.
|
||||
///
|
||||
/// The `schema` is the user schema of the region (no internal columns) and
|
||||
/// the `row_key_end` is the ending index of row key columns.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `row_key_end <= schema.num_columns()`.
|
||||
pub fn new(schema: SchemaRef, row_key_end: usize) -> Self {
|
||||
assert!(row_key_end <= schema.num_columns());
|
||||
|
||||
Self {
|
||||
payload: Payload::new(schema),
|
||||
num_rows_to_mutate: 0,
|
||||
row_key_end,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,6 +189,46 @@ impl WriteBatch {
|
||||
RecordBatch::new(self.schema().clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
|
||||
/// Validates `data` and converts it into a [RecordBatch].
|
||||
///
|
||||
/// It fills value columns by null, ignoring whether the column is nullable as the contents
|
||||
/// of value columns won't be read.
|
||||
fn process_delete_data(&self, data: NameToVector) -> Result<RecordBatch> {
|
||||
// Ensure row key columns are provided.
|
||||
for column_schema in self.row_key_column_schemas() {
|
||||
ensure!(
|
||||
data.0.contains_key(&column_schema.name),
|
||||
BatchMissingColumnSnafu {
|
||||
column: &column_schema.name,
|
||||
}
|
||||
);
|
||||
}
|
||||
// Ensure only provides row key columns.
|
||||
ensure!(
|
||||
data.0.len() == self.row_key_column_schemas().len(),
|
||||
MoreColumnThanExpectedSnafu
|
||||
);
|
||||
|
||||
let num_rows = data.num_rows();
|
||||
let mut columns = Vec::with_capacity(self.schema().num_columns());
|
||||
for column_schema in self.schema().column_schemas() {
|
||||
match data.0.get(&column_schema.name) {
|
||||
Some(col) => {
|
||||
validate_column(column_schema, col)?;
|
||||
columns.push(col.clone());
|
||||
}
|
||||
None => {
|
||||
// Fills value columns by null, these columns are just placeholders to ensure
|
||||
// the schema of the record batch is correct.
|
||||
let col = new_column_with_null(&column_schema.data_type, num_rows);
|
||||
columns.push(col);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(self.schema().clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
|
||||
fn add_num_rows_to_mutate(&mut self, len: usize) -> Result<()> {
|
||||
let num_rows = self.num_rows_to_mutate + len;
|
||||
ensure!(
|
||||
@@ -163,6 +238,11 @@ impl WriteBatch {
|
||||
self.num_rows_to_mutate = num_rows;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns all row key columns in the schema.
|
||||
fn row_key_column_schemas(&self) -> &[ColumnSchema] {
|
||||
&self.payload.schema.column_schemas()[..self.row_key_end]
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the length of the first vector in `data`.
|
||||
@@ -218,6 +298,17 @@ pub(crate) fn new_column_with_default_value(
|
||||
Ok(vector)
|
||||
}
|
||||
|
||||
/// Creates a new column and fills it by null.
|
||||
fn new_column_with_null(data_type: &ConcreteDataType, num_rows: usize) -> VectorRef {
|
||||
// TODO(yingwen): Use `NullVector` once it supports setting logical type.
|
||||
let mut mutable_vector = data_type.create_mutable_vector(num_rows);
|
||||
for _ in 0..num_rows {
|
||||
// Safety: push null is safe.
|
||||
mutable_vector.push_value_ref(ValueRef::Null).unwrap();
|
||||
}
|
||||
mutable_vector.to_vector()
|
||||
}
|
||||
|
||||
/// Vectors in [NameToVector] have same length.
|
||||
///
|
||||
/// MUST construct it via [`NameToVector::new()`] to ensure the vector lengths are validated.
|
||||
@@ -229,7 +320,7 @@ impl NameToVector {
|
||||
for (name, vector) in &data {
|
||||
ensure!(
|
||||
num_rows == vector.len(),
|
||||
LenNotEqualsSnafu {
|
||||
UnequalLengthsSnafu {
|
||||
name,
|
||||
expect: num_rows,
|
||||
given: vector.len(),
|
||||
@@ -264,6 +355,7 @@ pub(crate) fn new_test_batch() -> WriteBatch {
|
||||
("v1", LogicalTypeId::Boolean, true),
|
||||
],
|
||||
Some(2),
|
||||
3,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -325,7 +417,6 @@ mod tests {
|
||||
put_data.insert("ts".to_string(), tsv);
|
||||
|
||||
let mut batch = new_test_batch();
|
||||
assert!(batch.payload().is_empty());
|
||||
batch.put(put_data).unwrap();
|
||||
assert!(!batch.payload().is_empty());
|
||||
|
||||
@@ -352,7 +443,7 @@ mod tests {
|
||||
put_data.insert("k1".to_string(), boolv);
|
||||
|
||||
let mut batch =
|
||||
write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)], None);
|
||||
write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)], None, 1);
|
||||
let err = batch.put(put_data).unwrap_err();
|
||||
check_err(err, "Request is too large");
|
||||
}
|
||||
@@ -433,4 +524,82 @@ mod tests {
|
||||
let err = batch.put(put_data).unwrap_err();
|
||||
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_put_empty() {
|
||||
let mut batch = new_test_batch();
|
||||
batch.put(HashMap::new()).unwrap();
|
||||
assert!(batch.payload().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_empty() {
|
||||
let mut batch = new_test_batch();
|
||||
batch.delete(HashMap::new()).unwrap();
|
||||
assert!(batch.payload().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_delete() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef;
|
||||
|
||||
let mut keys = HashMap::with_capacity(3);
|
||||
keys.insert("k1".to_string(), intv.clone());
|
||||
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
|
||||
keys.insert("ts".to_string(), tsv);
|
||||
|
||||
let mut batch = new_test_batch();
|
||||
batch.delete(keys).unwrap();
|
||||
|
||||
let record_batch = &batch.payload().mutations[0].record_batch;
|
||||
assert_eq!(3, record_batch.num_rows());
|
||||
assert_eq!(4, record_batch.num_columns());
|
||||
let v1 = record_batch.column_by_name("v1").unwrap();
|
||||
assert!(v1.only_null());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_missing_column() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
|
||||
|
||||
let mut keys = HashMap::with_capacity(3);
|
||||
keys.insert("k1".to_string(), intv.clone());
|
||||
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
|
||||
|
||||
let mut batch = new_test_batch();
|
||||
let err = batch.delete(keys).unwrap_err();
|
||||
check_err(err, "Missing column ts");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_columns_more_than_row_key() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef;
|
||||
|
||||
let mut keys = HashMap::with_capacity(3);
|
||||
keys.insert("k1".to_string(), intv.clone());
|
||||
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv.clone());
|
||||
keys.insert("ts".to_string(), tsv);
|
||||
keys.insert("v2".to_string(), intv);
|
||||
|
||||
let mut batch = new_test_batch();
|
||||
let err = batch.delete(keys).unwrap_err();
|
||||
check_err(err, "More columns than expected");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_type_mismatch() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
|
||||
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
|
||||
|
||||
let mut keys = HashMap::with_capacity(3);
|
||||
keys.insert("k1".to_string(), intv.clone());
|
||||
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
|
||||
keys.insert("ts".to_string(), boolv);
|
||||
|
||||
let mut batch = new_test_batch();
|
||||
let err = batch.delete(keys).unwrap_err();
|
||||
check_err(err, "Type of column ts does not match");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,9 +89,7 @@ impl<'a> Decoder for PayloadDecoder<'a> {
|
||||
let record_batch = RecordBatch::try_from_df_record_batch(schema.clone(), record_batch)
|
||||
.context(CreateRecordBatchSnafu)?;
|
||||
let op_type = match MutationType::from_i32(*mutation_type) {
|
||||
Some(MutationType::Delete) => {
|
||||
unimplemented!("delete mutation is not implemented")
|
||||
}
|
||||
Some(MutationType::Delete) => OpType::Delete,
|
||||
Some(MutationType::Put) => OpType::Put,
|
||||
None => {
|
||||
return BatchCorruptedSnafu {
|
||||
|
||||
@@ -110,6 +110,9 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
|
||||
// Test schema only has two row key columns: k0, ts.
|
||||
const TEST_ROW_KEY_END: usize = 2;
|
||||
|
||||
fn new_test_schema_builder(
|
||||
v0_constraint: Option<Option<ColumnDefaultConstraint>>,
|
||||
) -> SchemaBuilder {
|
||||
@@ -158,7 +161,7 @@ mod tests {
|
||||
// Mutation doesn't check schema version, so we don't have to bump the version here.
|
||||
let schema = new_test_schema(Some(Some(ColumnDefaultConstraint::null_value())));
|
||||
// Use WriteBatch to build a payload and its mutation.
|
||||
let mut batch = WriteBatch::new(schema_old);
|
||||
let mut batch = WriteBatch::new(schema_old, TEST_ROW_KEY_END);
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
let mutation = &mut batch.payload.mutations[0];
|
||||
@@ -171,7 +174,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_write_batch_compat_write() {
|
||||
let schema_old = new_test_schema(None);
|
||||
let mut batch = WriteBatch::new(schema_old);
|
||||
let mut batch = WriteBatch::new(schema_old, TEST_ROW_KEY_END);
|
||||
let put_data = new_put_data();
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
@@ -198,7 +201,7 @@ mod tests {
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let mut batch = WriteBatch::new(schema_new);
|
||||
let mut batch = WriteBatch::new(schema_new, TEST_ROW_KEY_END);
|
||||
let err = batch.compat_write(&schema_old).unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::WriteToOldVersion { .. }),
|
||||
@@ -209,14 +212,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_write_batch_skip_compat() {
|
||||
let schema = new_test_schema(None);
|
||||
let mut batch = WriteBatch::new(schema.clone());
|
||||
let mut batch = WriteBatch::new(schema.clone(), TEST_ROW_KEY_END);
|
||||
batch.compat_write(&schema).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_compat_columns_not_in_schema() {
|
||||
let schema_has_column = new_test_schema(Some(None));
|
||||
let mut batch = WriteBatch::new(schema_has_column);
|
||||
let mut batch = WriteBatch::new(schema_has_column, TEST_ROW_KEY_END);
|
||||
|
||||
let schema_no_column = Arc::new(new_test_schema_builder(None).version(1).build().unwrap());
|
||||
let err = batch.compat_write(&schema_no_column).unwrap_err();
|
||||
|
||||
@@ -31,6 +31,11 @@ pub trait WriteRequest: Send {
|
||||
///
|
||||
/// `data` is the columnar format of the data to put.
|
||||
fn put(&mut self, data: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
|
||||
|
||||
/// Delete rows by `keys`.
|
||||
///
|
||||
/// `keys` are the row keys, in columnar format, of the rows to delete.
|
||||
fn delete(&mut self, keys: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -19,20 +19,27 @@
|
||||
pub type SequenceNumber = u64;
|
||||
|
||||
/// Operation type of the value to write to storage.
|
||||
///
|
||||
/// The enum values are stored in the SST files so don't change
|
||||
/// them if possible.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum OpType {
|
||||
/// Delete operation.
|
||||
Delete = 0,
|
||||
/// Put operation.
|
||||
Put,
|
||||
Put = 1,
|
||||
}
|
||||
|
||||
impl OpType {
|
||||
/// Cast the [OpType] to u8.
|
||||
#[inline]
|
||||
pub fn as_u8(&self) -> u8 {
|
||||
*self as u8
|
||||
}
|
||||
|
||||
/// Minimal op type after casting to u8.
|
||||
pub const fn min_type() -> OpType {
|
||||
OpType::Put
|
||||
OpType::Delete
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +49,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_op_type() {
|
||||
assert_eq!(0, OpType::Put.as_u8());
|
||||
assert_eq!(0, OpType::Delete.as_u8());
|
||||
assert_eq!(1, OpType::Put.as_u8());
|
||||
assert_eq!(0, OpType::min_type().as_u8());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user