refactor: Remove PutOperation and Simplify WriteRequest API (#775)

* chore: Remove unused MutationExtra

* refactor(storage): Refactor Mutation and Payload

Change Mutation from enum to a struct that holds op type and record
batches so the encoder don't need to convert the mutation into record
batch. Now The Payload is no more an enum, it just holds the data, to
be serialized to the WAL, of the WriteBatch. The encoder and decoder
now deal with the Payload instead of the WriteBatch, so we could hold
more information not necessary to be stored to the WAL in the
WriteBatch.

This commit also merge variants in write_batch::Error to storage::Error
as some variants of them denote the same error.

* test(storage): Pass all tests in storage

* chore: Remove unused codes then format codes

* test(storage): Fix test_put_unknown_column test

* style(storage): Fix clippy

* chore: Remove some unused codes

* chore: Rebase upstream and fix clippy

* chore(storage): Remove unused codes

* chore(storage): Update comments

* feat: Remove PayloadType from wal.proto

* chore: Address CR comments

* chore: Remove unused write_batch.proto
This commit is contained in:
Yingwen
2022-12-26 13:11:24 +08:00
committed by GitHub
parent e85780b5e4
commit f8500e54c1
30 changed files with 671 additions and 1804 deletions

1
Cargo.lock generated
View File

@@ -6784,6 +6784,7 @@ dependencies = [
"common-base",
"common-error",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-time",

View File

@@ -153,13 +153,6 @@ pub enum Error {
table_name: String,
},
#[snafu(display("Columns {} not exist in table {}", column_names.join(","), table_name))]
ColumnsNotExist {
backtrace: Backtrace,
column_names: Vec<String>,
table_name: String,
},
#[snafu(display("Failed to alter table {}, source: {}", table_name, source))]
AlterTable {
table_name: String,
@@ -176,12 +169,6 @@ pub enum Error {
column_qualified_name: String,
},
#[snafu(display("Unsupported column default constraint, source: {}", source))]
UnsupportedDefaultConstraint {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to convert metadata from deserialized data, source: {}",
source
@@ -219,11 +206,8 @@ impl ErrorExt for Error {
| ProjectedColumnNotFound { .. }
| InvalidPrimaryKey { .. }
| MissingTimestampIndex { .. }
| UnsupportedDefaultConstraint { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
ColumnsNotExist { .. } => StatusCode::TableColumnNotFound,
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,

View File

@@ -27,18 +27,16 @@ use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::VectorRef;
use futures::task::{Context, Poll};
use futures::Stream;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, PutOperation, ReadContext, Region,
RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta,
ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::error::{Error as TableError, Result as TableResult};
use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
};
@@ -48,8 +46,8 @@ use table::table::Table;
use tokio::sync::Mutex;
use crate::error::{
self, ColumnsNotExistSnafu, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu,
TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu,
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
@@ -86,66 +84,17 @@ impl<R: Region> Table for MitoTable<R> {
let mut write_request = self.region.write_request();
let mut put_op = write_request.put_op();
let mut columns_values = request.columns_values;
let table_info = self.table_info();
let schema = self.schema();
let key_columns = table_info.meta.row_key_column_names();
let value_columns = table_info.meta.value_column_names();
let columns_values = request.columns_values;
// columns_values is not empty, it's safe to unwrap
let rows_num = columns_values.values().next().unwrap().len();
// Add row key columns
for name in key_columns {
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
let vector = match columns_values.remove(name) {
Some(v) => v,
None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?,
};
put_op
.add_key_column(name, vector)
.map_err(TableError::new)?;
}
// Add value columns
for name in value_columns {
let column_schema = schema
.column_schema_by_name(name)
.expect("column schema not found");
let vector = match columns_values.remove(name) {
Some(v) => v,
None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?,
};
put_op
.add_value_column(name, vector)
.map_err(TableError::new)?;
}
ensure!(
columns_values.is_empty(),
ColumnsNotExistSnafu {
table_name: &table_info.name,
column_names: columns_values
.keys()
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>(),
}
);
logging::trace!(
"Insert into table {} with put_op: {:?}",
table_info.name,
put_op
"Insert into table {} with data: {:?}",
self.table_info().name,
columns_values
);
write_request.put(put_op).map_err(TableError::new)?;
write_request.put(columns_values).map_err(TableError::new)?;
let _resp = self
.region
@@ -375,21 +324,6 @@ impl<R: Region> MitoTable<R> {
Ok(MitoTable::new(table_info, region, manifest))
}
fn try_get_column_default_constraint_vector(
column_schema: &ColumnSchema,
rows_num: usize,
) -> TableResult<VectorRef> {
// TODO(dennis): when we support altering schema, we should check the schemas difference between table and region
let vector = column_schema
.create_default_vector(rows_num)
.context(UnsupportedDefaultConstraintSnafu)?
.context(MissingColumnSnafu {
name: &column_schema.name,
})?;
Ok(vector)
}
pub async fn open(
table_name: &str,
table_dir: &str,

View File

@@ -24,7 +24,7 @@ use common_telemetry::logging;
use datatypes::prelude::{DataType, Value, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::{Mutation, WriteBatch};
use storage::write_batch::WriteBatch;
use store_api::storage::{
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse,
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest,
@@ -219,10 +219,10 @@ impl MockRegionInner {
let mut memtable = self.memtable.write().unwrap();
for Mutation::Put(put) in request.iter() {
for mutation in &request.payload().mutations {
for ColumnSchema { name, .. } in metadata.user_schema().column_schemas() {
let column = memtable.get_mut(name).unwrap();
if let Some(data) = put.column_by_name(name) {
if let Some(data) = mutation.record_batch.column_by_name(name) {
(0..data.len()).for_each(|i| column.push(data.get(i)));
}
}

View File

@@ -13,6 +13,7 @@ bytes = "1.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }

View File

@@ -33,25 +33,13 @@ rows | protobuf | arrow |
*/
fn encode_arrow(batch: &WriteBatch, dst: &mut Vec<u8>) {
let encoder = codec::WriteBatchArrowEncoder::new();
let result = encoder.encode(batch, dst);
assert!(result.is_ok());
}
fn encode_protobuf(batch: &WriteBatch, dst: &mut Vec<u8>) {
let encoder = codec::WriteBatchProtobufEncoder {};
let result = encoder.encode(batch, dst);
let encoder = codec::PayloadEncoder::new();
let result = encoder.encode(batch.payload(), dst);
assert!(result.is_ok());
}
fn decode_arrow(dst: &[u8], mutation_types: &[i32]) {
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec());
let result = decoder.decode(dst);
assert!(result.is_ok());
}
fn decode_protobuf(dst: &[u8], mutation_types: &[i32]) {
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec());
let decoder = codec::PayloadDecoder::new(mutation_types);
let result = decoder.decode(dst);
assert!(result.is_ok());
}
@@ -60,32 +48,16 @@ fn bench_wal_decode(c: &mut Criterion) {
let (batch_10, types_10) = gen_new_batch_and_types(1);
let (batch_100, types_100) = gen_new_batch_and_types(10);
let (batch_10000, types_10000) = gen_new_batch_and_types(100);
let mut dst_protobuf_10 = vec![];
let mut dst_protobuf_100 = vec![];
let mut dst_protobuf_10000 = vec![];
let mut dst_arrow_10 = vec![];
let mut dst_arrow_100 = vec![];
let mut dst_arrow_10000 = vec![];
encode_protobuf(&batch_10, &mut dst_protobuf_10);
encode_protobuf(&batch_100, &mut dst_protobuf_100);
encode_protobuf(&batch_10000, &mut dst_protobuf_10000);
encode_arrow(&batch_10, &mut dst_arrow_10);
encode_arrow(&batch_100, &mut dst_arrow_100);
encode_arrow(&batch_10000, &mut dst_arrow_10000);
let mut group = c.benchmark_group("wal_decode");
group.bench_function("protobuf_decode_with_10_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_10, &types_10))
});
group.bench_function("protobuf_decode_with_100_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_100, &types_100))
});
group.bench_function("protobuf_decode_with_10000_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_10000, &types_10000))
});
group.bench_function("arrow_decode_with_10_num_rows", |b| {
b.iter(|| decode_arrow(&dst_arrow_10, &types_10))
});

View File

@@ -33,16 +33,9 @@ rows | protobuf | arrow |
*/
fn encode_arrow(batch: &WriteBatch) {
let encoder = codec::WriteBatchArrowEncoder::new();
let encoder = codec::PayloadEncoder::new();
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
assert!(result.is_ok());
}
fn encode_protobuf(batch: &WriteBatch) {
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
let result = encoder.encode(batch.payload(), &mut dst);
assert!(result.is_ok());
}
@@ -52,15 +45,6 @@ fn bench_wal_encode(c: &mut Criterion) {
let (batch_10000, _) = gen_new_batch_and_types(100);
let mut group = c.benchmark_group("wal_encode");
group.bench_function("protobuf_encode_with_10_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_10))
});
group.bench_function("protobuf_encode_with_100_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_100))
});
group.bench_function("protobuf_encode_with_10000_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_10000))
});
group.bench_function("arrow_encode_with_10_num_rows", |b| {
b.iter(|| encode_arrow(&batch_10))
});

View File

@@ -33,22 +33,12 @@ rows | protobuf | arrow |
*/
fn codec_arrow(batch: &WriteBatch, mutation_types: &[i32]) {
let encoder = codec::WriteBatchArrowEncoder::new();
let encoder = codec::PayloadEncoder::new();
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
let result = encoder.encode(batch.payload(), &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec());
let result = decoder.decode(&dst);
assert!(result.is_ok());
}
fn codec_protobuf(batch: &WriteBatch, mutation_types: &[i32]) {
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec());
let decoder = codec::PayloadDecoder::new(mutation_types);
let result = decoder.decode(&dst);
assert!(result.is_ok());
}
@@ -59,15 +49,6 @@ fn bench_wal_encode_decode(c: &mut Criterion) {
let (batch_10000, types_10000) = gen_new_batch_and_types(100);
let mut group = c.benchmark_group("wal_encode_decode");
group.bench_function("protobuf_encode_decode_with_10_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_10, &types_10))
});
group.bench_function("protobuf_encode_decode_with_100_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_100, &types_100))
});
group.bench_function("protobuf_encode_decode_with_10000_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_10000, &types_10000))
});
group.bench_function("arrow_encode_decode_with_10_num_rows", |b| {
b.iter(|| codec_arrow(&batch_10, &types_10))
});

View File

@@ -14,17 +14,18 @@
pub mod write_batch_util;
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{
BooleanVector, Float64Vector, StringVector, TimestampMillisecondVector, UInt64Vector,
BooleanVector, Float64Vector, StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef,
};
use rand::Rng;
use storage::proto;
use storage::write_batch::{PutData, WriteBatch};
use store_api::storage::{consts, PutOperation, WriteRequest};
use storage::write_batch::WriteBatch;
use store_api::storage::{consts, WriteRequest};
pub fn new_test_batch() -> WriteBatch {
write_batch_util::new_write_batch(
@@ -69,25 +70,25 @@ pub fn gen_new_batch_and_types(putdate_nums: usize) -> (WriteBatch, Vec<i32>) {
rng.fill(&mut boolvs[..]);
rng.fill(&mut tsvs[..]);
rng.fill(&mut fvs[..]);
let intv = Arc::new(UInt64Vector::from_slice(&intvs));
let boolv = Arc::new(BooleanVector::from(boolvs.to_vec()));
let tsv = Arc::new(TimestampMillisecondVector::from_values(tsvs));
let fvs = Arc::new(Float64Vector::from_slice(&fvs));
let svs = Arc::new(StringVector::from_slice(&svs));
let mut put_data = PutData::default();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv.clone()).unwrap();
put_data.add_key_column("4", fvs.clone()).unwrap();
put_data.add_key_column("5", fvs.clone()).unwrap();
put_data.add_key_column("6", fvs.clone()).unwrap();
put_data.add_key_column("7", fvs.clone()).unwrap();
put_data.add_key_column("8", fvs.clone()).unwrap();
put_data.add_key_column("9", fvs.clone()).unwrap();
put_data.add_key_column("10", svs.clone()).unwrap();
let intv = Arc::new(UInt64Vector::from_slice(&intvs)) as VectorRef;
let boolv = Arc::new(BooleanVector::from(boolvs.to_vec())) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_values(tsvs)) as VectorRef;
let fvs = Arc::new(Float64Vector::from_slice(&fvs)) as VectorRef;
let svs = Arc::new(StringVector::from_slice(&svs)) as VectorRef;
let mut put_data = HashMap::with_capacity(11);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv.clone());
put_data.insert("4".to_string(), fvs.clone());
put_data.insert("5".to_string(), fvs.clone());
put_data.insert("6".to_string(), fvs.clone());
put_data.insert("7".to_string(), fvs.clone());
put_data.insert("8".to_string(), fvs.clone());
put_data.insert("9".to_string(), fvs.clone());
put_data.insert("10".to_string(), svs.clone());
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
let types = proto::wal::gen_mutation_types(batch.payload());
(batch, types)
}

View File

@@ -14,6 +14,6 @@
fn main() {
tonic_build::configure()
.compile(&["proto/wal.proto", "proto/write_batch.proto"], &["."])
.compile(&["proto/wal.proto"], &["."])
.expect("compile proto");
}

View File

@@ -3,23 +3,12 @@ syntax = "proto3";
package greptime.storage.wal.v1;
message WalHeader {
PayloadType payload_type = 1;
uint64 last_manifest_version = 2;
uint64 last_manifest_version = 1;
// Type of each mutation in payload, now only arrow payload uses this field.
repeated MutationType mutation_types = 3;
}
enum PayloadType {
NONE = 0;
WRITE_BATCH_ARROW = 1;
WRITE_BATCH_PROTO = 2;
}
message MutationExtra {
MutationType mutation_type = 1;
repeated MutationType mutation_types = 2;
}
enum MutationType {
PUT = 0;
DELETE = 1;
DELETE = 0;
PUT = 1;
}

View File

@@ -1,95 +0,0 @@
syntax = "proto3";
package greptime.storage.write_batch.v1;
message WriteBatch {
Schema schema = 1;
repeated Mutation mutations = 2;
}
message Schema {
repeated ColumnSchema column_schemas = 1;
TimestampIndex timestamp_index = 2;
}
message TimestampIndex {
uint64 value = 1;
}
message ColumnSchema {
string name = 1;
DataType data_type = 2;
bool is_nullable = 3;
bool is_time_index = 4;
}
message Mutation {
oneof mutation {
Put put = 1;
Delete delete = 2;
}
}
message Put {
repeated Column columns = 1;
}
message Delete {
// TODO(zxy)
}
message Column {
Values values = 1;
bytes value_null_mask = 2;
uint64 num_rows = 3;
}
// TODO(jiachun): Enum might be insufficient to represent some composite data type such as list, struct.
// In the future, may be we can refer to https://github.com/apache/arrow/blob/master/format/Schema.fbs#L398
enum DataType {
NULL = 0;
BOOLEAN = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
UINT8 = 6;
UINT16 = 7;
UINT32 = 8;
UINT64 = 9;
FLOAT32 = 10;
FLOAT64 = 11;
STRING = 12;
BINARY = 13;
DATE = 14;
DATETIME = 15;
TIMESTAMP_SECOND = 16;
TIMESTAMP_MILLISECOND = 17;
TIMESTAMP_MICROSECOND = 18;
TIMESTAMP_NANOSECOND = 19;
}
message Values {
repeated int32 i8_values = 1;
repeated int32 i16_values = 2;
repeated int32 i32_values = 3;
repeated int64 i64_values = 4;
repeated uint32 u8_values = 5;
repeated uint32 u16_values = 6;
repeated uint32 u32_values = 7;
repeated uint64 u64_values = 8;
repeated float f32_values = 9;
repeated double f64_values = 10;
repeated bool bool_values = 11;
repeated bytes binary_values = 12;
repeated string string_values = 13;
repeated int32 date_values = 14;
repeated int64 datetime_values = 15;
repeated int64 ts_second_values = 16;
repeated int64 ts_millisecond_values = 17;
repeated int64 ts_microsecond_values = 18;
repeated int64 ts_nanosecond_values = 19;
}

View File

@@ -17,7 +17,7 @@ use std::io::Error as IoError;
use std::str::Utf8Error;
use common_error::prelude::*;
use datatypes::arrow;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
@@ -25,6 +25,7 @@ use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use crate::metadata::Error as MetadataError;
use crate::write_batch;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -60,7 +61,7 @@ pub enum Error {
#[snafu(display("Failed to create RecordBatch from vectors, source: {}", source))]
NewRecordBatch {
backtrace: Backtrace,
source: arrow::error::ArrowError,
source: ArrowError,
},
#[snafu(display("Fail to read object from path: {}, source: {}", path, source))]
@@ -145,12 +146,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Invalid timestamp in write batch, source: {}", source))]
InvalidTimestamp {
#[snafu(backtrace)]
source: crate::write_batch::Error,
},
#[snafu(display("Task already cancelled"))]
Cancelled { backtrace: Backtrace },
@@ -293,13 +288,14 @@ pub enum Error {
},
#[snafu(display(
"Failed to add default value for column {}, source: {}",
column,
"Failed to create default value for column {}, source: {}",
name,
source
))]
AddDefault {
column: String,
source: crate::write_batch::Error,
CreateDefault {
name: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
@@ -366,6 +362,78 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Unknown column {}", name))]
UnknownColumn { name: String, backtrace: Backtrace },
#[snafu(display("Failed to create record batch for write batch, source:{}", source))]
CreateRecordBatch {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display(
"Request is too large, max is {}, current is {}",
write_batch::MAX_BATCH_SIZE,
num_rows
))]
RequestTooLarge {
num_rows: usize,
backtrace: Backtrace,
},
#[snafu(display(
"Type of column {} does not match type in schema, expect {:?}, given {:?}",
name,
expect,
given
))]
TypeMismatch {
name: String,
expect: ConcreteDataType,
given: ConcreteDataType,
backtrace: Backtrace,
},
#[snafu(display("Column {} is not null but input has null", name))]
HasNull { name: String, backtrace: Backtrace },
#[snafu(display(
"Length of column {} not equals to other columns, expect {}, given {}",
name,
expect,
given
))]
LenNotEquals {
name: String,
expect: usize,
given: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode write batch, corrupted data {}", message))]
BatchCorrupted {
message: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode arrow data, source: {}", source))]
DecodeArrow {
backtrace: Backtrace,
source: ArrowError,
},
#[snafu(display("Failed to encode arrow data, source: {}", source))]
EncodeArrow {
backtrace: Backtrace,
source: ArrowError,
},
#[snafu(display("Failed to parse schema, source: {}", source))]
ParseSchema {
backtrace: Backtrace,
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -378,12 +446,16 @@ impl ErrorExt for Error {
InvalidScanIndex { .. }
| BatchMissingColumn { .. }
| BatchMissingTimestamp { .. }
| InvalidTimestamp { .. }
| InvalidProjection { .. }
| BuildBatch { .. }
| NotInSchemaToCompat { .. }
| WriteToOldVersion { .. }
| IllegalTimestampColumnType { .. } => StatusCode::InvalidArguments,
| IllegalTimestampColumnType { .. }
| CreateRecordBatch { .. }
| RequestTooLarge { .. }
| TypeMismatch { .. }
| HasNull { .. }
| LenNotEquals { .. } => StatusCode::InvalidArguments,
Utf8 { .. }
| EncodeJson { .. }
@@ -402,7 +474,11 @@ impl ErrorExt for Error {
| CompatRead { .. }
| CreateDefaultToRead { .. }
| NoDefaultToRead { .. }
| NewRecordBatch { .. } => StatusCode::Unexpected,
| NewRecordBatch { .. }
| BatchCorrupted { .. }
| DecodeArrow { .. }
| EncodeArrow { .. }
| ParseSchema { .. } => StatusCode::Unexpected,
FlushIo { .. }
| WriteParquet { .. }
@@ -420,11 +496,13 @@ impl ErrorExt for Error {
| InvalidRegionState { .. }
| ReadWal { .. } => StatusCode::StorageUnavailable,
UnknownColumn { .. } => StatusCode::TableColumnNotFound,
InvalidAlterRequest { source, .. }
| InvalidRegionDesc { source, .. }
| ConvertColumnSchema { source, .. } => source.status_code(),
PushBatch { source, .. } => source.status_code(),
AddDefault { source, .. } => source.status_code(),
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalStable { source, .. } => source.status_code(),
}
@@ -441,9 +519,7 @@ impl ErrorExt for Error {
#[cfg(test)]
mod tests {
use common_error::prelude::StatusCode::*;
use datatypes::arrow::error::ArrowError;
use snafu::GenerateImplicitData;
use super::*;

View File

@@ -12,14 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::vectors::VectorRef;
use snafu::OptionExt;
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
use store_api::storage::{OpType, SequenceNumber};
use super::MemtableRef;
use crate::error::{self, Result};
use crate::error::Result;
use crate::memtable::KeyValues;
use crate::write_batch::{Mutation, PutData, WriteBatch};
use crate::write_batch::{Mutation, Payload};
/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable].
pub struct Inserter {
@@ -37,21 +35,20 @@ impl Inserter {
}
}
// TODO(yingwen): Can we take the WriteBatch?
/// Insert write batch into memtable.
/// Insert write batch payload into memtable.
///
/// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the
/// schemas of `memtable` are consistent with `batch`'s.
pub fn insert_memtable(&mut self, batch: &WriteBatch, memtable: &MemtableRef) -> Result<()> {
if batch.is_empty() {
/// schemas of `memtable` are consistent with `payload`'s.
pub fn insert_memtable(&mut self, payload: &Payload, memtable: &MemtableRef) -> Result<()> {
if payload.is_empty() {
return Ok(());
}
// This function only makes effect in debug mode.
validate_input_and_memtable_schemas(batch, memtable);
validate_input_and_memtable_schemas(payload, memtable);
// Enough to hold all key or value columns.
let total_column_num = batch.schema().num_columns();
let total_column_num = payload.schema.num_columns();
// Reusable KeyValues buffer.
let mut kvs = KeyValues {
sequence: self.sequence,
@@ -61,12 +58,8 @@ impl Inserter {
values: Vec::with_capacity(total_column_num),
};
for mutation in batch {
match mutation {
Mutation::Put(put_data) => {
self.write_one_mutation(put_data, memtable, &mut kvs)?;
}
}
for mutation in &payload.mutations {
self.write_one_mutation(mutation, memtable, &mut kvs)?;
}
Ok(())
@@ -74,21 +67,22 @@ impl Inserter {
fn write_one_mutation(
&mut self,
put_data: &PutData,
mutation: &Mutation,
memtable: &MemtableRef,
kvs: &mut KeyValues,
) -> Result<()> {
let schema = memtable.schema();
let num_rows = put_data.num_rows();
let num_rows = mutation.record_batch.num_rows();
kvs.reset(OpType::Put, self.index_in_batch);
kvs.reset(mutation.op_type, self.index_in_batch);
for key_col in schema.row_key_columns() {
clone_put_data_column_to(put_data, &key_col.desc, &mut kvs.keys)?;
for key_idx in schema.row_key_indices() {
kvs.keys.push(mutation.record_batch.column(key_idx).clone());
}
for value_col in schema.value_columns() {
clone_put_data_column_to(put_data, &value_col.desc, &mut kvs.values)?;
for value_idx in schema.value_indices() {
kvs.values
.push(mutation.record_batch.column(value_idx).clone());
}
memtable.write(kvs)?;
@@ -99,30 +93,20 @@ impl Inserter {
}
}
fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtable: &MemtableRef) {
fn validate_input_and_memtable_schemas(payload: &Payload, memtable: &MemtableRef) {
if cfg!(debug_assertions) {
let batch_schema = batch.schema();
let payload_schema = &payload.schema;
let memtable_schema = memtable.schema();
let user_schema = memtable_schema.user_schema();
debug_assert_eq!(batch_schema.version(), user_schema.version());
debug_assert_eq!(payload_schema.version(), user_schema.version());
// Only validate column schemas.
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
debug_assert_eq!(
payload_schema.column_schemas(),
user_schema.column_schemas()
);
}
}
fn clone_put_data_column_to(
put_data: &PutData,
desc: &ColumnDescriptor,
target: &mut Vec<VectorRef>,
) -> Result<()> {
let vector = put_data
.column_by_name(&desc.name)
.context(error::BatchMissingColumnSnafu { column: &desc.name })?;
target.push(vector.clone());
Ok(())
}
/// Holds `start` and `end` indexes to get a slice `[start, end)` from the vector whose
/// timestamps belong to same time range at `range_index`.
#[derive(Debug, PartialEq)]
@@ -135,13 +119,14 @@ struct SliceIndex {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_time::timestamp::Timestamp;
use datatypes::type_id::LogicalTypeId;
use datatypes::value::Value;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use store_api::storage::{PutOperation, WriteRequest};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use store_api::storage::WriteRequest;
use super::*;
use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder};
@@ -149,6 +134,7 @@ mod tests {
use crate::schema::RegionSchemaRef;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::write_batch_util;
use crate::write_batch::WriteBatch;
fn new_test_write_batch() -> WriteBatch {
write_batch_util::new_write_batch(
@@ -172,11 +158,11 @@ mod tests {
}
fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option<i64>)]) {
let mut put_data = PutData::with_num_columns(2);
let mut put_data = HashMap::with_capacity(2);
let ts = TimestampMillisecondVector::from_values(data.iter().map(|v| v.0));
put_data.add_key_column("ts", Arc::new(ts)).unwrap();
put_data.insert("ts".to_string(), Arc::new(ts) as VectorRef);
let value = Int64Vector::from(data.iter().map(|v| v.1).collect::<Vec<_>>());
put_data.add_value_column("value", Arc::new(value)).unwrap();
put_data.insert("value".to_string(), Arc::new(value) as VectorRef);
batch.put(put_data).unwrap();
}
@@ -232,7 +218,9 @@ mod tests {
],
);
inserter.insert_memtable(&batch, &mutable_memtable).unwrap();
inserter
.insert_memtable(batch.payload(), &mutable_memtable)
.unwrap();
check_memtable_content(
&mutable_memtable,
sequence,

View File

@@ -13,4 +13,3 @@
// limitations under the License.
pub mod wal;
pub mod write_batch;

View File

@@ -15,13 +15,16 @@
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.wal.v1");
use crate::write_batch::{Mutation, WriteBatch};
use store_api::storage::OpType;
pub fn gen_mutation_types(write_batch: &WriteBatch) -> Vec<i32> {
write_batch
use crate::write_batch::Payload;
pub fn gen_mutation_types(payload: &Payload) -> Vec<i32> {
payload
.mutations
.iter()
.map(|m| match m {
Mutation::Put(_) => MutationType::Put.into(),
.map(|m| match m.op_type {
OpType::Put => MutationType::Put.into(),
})
.collect::<Vec<_>>()
}

View File

@@ -1,391 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.write_batch.v1");
use std::sync::Arc;
use common_base::BitVec;
use common_error::prelude::*;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema;
use datatypes::types::TimestampType;
use datatypes::vectors::{
BinaryVector, BinaryVectorBuilder, BooleanVector, BooleanVectorBuilder, DateTimeVector,
DateTimeVectorBuilder, DateVector, DateVectorBuilder, Float32Vector, Float32VectorBuilder,
Float64Vector, Float64VectorBuilder, Int16Vector, Int16VectorBuilder, Int32Vector,
Int32VectorBuilder, Int64Vector, Int64VectorBuilder, Int8Vector, Int8VectorBuilder,
StringVector, StringVectorBuilder, TimestampMicrosecondVector,
TimestampMicrosecondVectorBuilder, TimestampMillisecondVector,
TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampNanosecondVectorBuilder,
TimestampSecondVector, TimestampSecondVectorBuilder, UInt16Vector, UInt16VectorBuilder,
UInt32Vector, UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector,
UInt8VectorBuilder, Vector, VectorRef,
};
use paste::paste;
use snafu::OptionExt;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String, backtrace: Backtrace },
#[snafu(display("Empty column values read"))]
EmptyColumnValues { backtrace: Backtrace },
#[snafu(display("Invalid data type: {}", data_type))]
InvalidDataType {
data_type: i32,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl TimestampIndex {
pub fn new(value: u64) -> Self {
Self { value }
}
}
impl From<&schema::SchemaRef> for Schema {
fn from(schema: &schema::SchemaRef) -> Self {
let column_schemas = schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.into())
.collect();
Schema {
column_schemas,
timestamp_index: schema
.timestamp_index()
.map(|index| TimestampIndex::new(index as u64)),
}
}
}
impl TryFrom<Schema> for schema::SchemaRef {
type Error = Error;
fn try_from(schema: Schema) -> Result<Self> {
let column_schemas = schema
.column_schemas
.iter()
.map(schema::ColumnSchema::try_from)
.collect::<Result<Vec<_>>>()?;
let schema = Arc::new(
schema::SchemaBuilder::try_from(column_schemas)
.context(ConvertSchemaSnafu)?
.build()
.context(ConvertSchemaSnafu)?,
);
Ok(schema)
}
}
impl From<&schema::ColumnSchema> for ColumnSchema {
fn from(cs: &schema::ColumnSchema) -> Self {
Self {
name: cs.name.clone(),
data_type: DataType::from(&cs.data_type).into(),
is_nullable: cs.is_nullable(),
is_time_index: cs.is_time_index(),
}
}
}
impl TryFrom<&ColumnSchema> for schema::ColumnSchema {
type Error = Error;
fn try_from(column_schema: &ColumnSchema) -> Result<Self> {
if let Some(data_type) = DataType::from_i32(column_schema.data_type) {
Ok(schema::ColumnSchema::new(
column_schema.name.clone(),
data_type.into(),
column_schema.is_nullable,
)
.with_time_index(column_schema.is_time_index))
} else {
InvalidDataTypeSnafu {
data_type: column_schema.data_type,
}
.fail()
}
}
}
impl From<&ConcreteDataType> for DataType {
fn from(data_type: &ConcreteDataType) -> Self {
match data_type {
ConcreteDataType::Boolean(_) => DataType::Boolean,
ConcreteDataType::Int8(_) => DataType::Int8,
ConcreteDataType::Int16(_) => DataType::Int16,
ConcreteDataType::Int32(_) => DataType::Int32,
ConcreteDataType::Int64(_) => DataType::Int64,
ConcreteDataType::UInt8(_) => DataType::Uint8,
ConcreteDataType::UInt16(_) => DataType::Uint16,
ConcreteDataType::UInt32(_) => DataType::Uint32,
ConcreteDataType::UInt64(_) => DataType::Uint64,
ConcreteDataType::Float32(_) => DataType::Float64,
ConcreteDataType::Float64(_) => DataType::Float64,
ConcreteDataType::String(_) => DataType::String,
ConcreteDataType::Null(_) => DataType::Null,
ConcreteDataType::Binary(_) => DataType::Binary,
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => DataType::TimestampSecond,
TimestampType::Millisecond(_) => DataType::TimestampMillisecond,
TimestampType::Microsecond(_) => DataType::TimestampMicrosecond,
TimestampType::Nanosecond(_) => DataType::TimestampNanosecond,
},
ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::List(_) => {
// TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
unimplemented!("data type {:?} is not supported", data_type)
}
}
}
}
impl From<DataType> for ConcreteDataType {
fn from(data_type: DataType) -> Self {
match data_type {
DataType::Boolean => ConcreteDataType::boolean_datatype(),
DataType::Int8 => ConcreteDataType::int8_datatype(),
DataType::Int16 => ConcreteDataType::int16_datatype(),
DataType::Int32 => ConcreteDataType::int32_datatype(),
DataType::Int64 => ConcreteDataType::int64_datatype(),
DataType::Uint8 => ConcreteDataType::uint8_datatype(),
DataType::Uint16 => ConcreteDataType::uint16_datatype(),
DataType::Uint32 => ConcreteDataType::uint32_datatype(),
DataType::Uint64 => ConcreteDataType::uint64_datatype(),
DataType::Float32 => ConcreteDataType::float32_datatype(),
DataType::Float64 => ConcreteDataType::float64_datatype(),
DataType::String => ConcreteDataType::string_datatype(),
DataType::Binary => ConcreteDataType::binary_datatype(),
DataType::Null => ConcreteDataType::null_datatype(),
DataType::Date => ConcreteDataType::date_datatype(),
DataType::Datetime => ConcreteDataType::datetime_datatype(),
DataType::TimestampSecond => ConcreteDataType::timestamp_second_datatype(),
DataType::TimestampMillisecond => ConcreteDataType::timestamp_millisecond_datatype(),
DataType::TimestampMicrosecond => ConcreteDataType::timestamp_microsecond_datatype(),
DataType::TimestampNanosecond => ConcreteDataType::timestamp_nanosecond_datatype(),
}
}
}
#[macro_export]
macro_rules! gen_columns {
($key: tt, $vec_ty: ty, $vari: ident, $cast: expr) => {
paste! {
pub fn [<gen_columns_ $key>](vector: &VectorRef) -> Result<Column> {
let mut column = Column::default();
let mut values = Values::default();
let vector_ref =
vector
.as_any()
.downcast_ref::<$vec_ty>()
.with_context(|| ConversionSnafu {
from: std::format!("{:?}", vector.as_ref().data_type()),
})?;
let mut bits: Option<BitVec> = None;
vector_ref
.iter_data()
.enumerate()
.for_each(|(i, value)| match value {
Some($vari) => values.[<$key _values>].push($cast),
None => {
if (bits.is_none()) {
bits = Some(BitVec::repeat(false, vector_ref.len()));
}
bits.as_mut().map(|x| x.set(i, true));
}
});
let null_mask = if let Some(bits) = bits {
bits.into_vec()
} else {
Default::default()
};
column.values = Some(values);
column.value_null_mask = null_mask;
column.num_rows = vector_ref.len() as u64;
Ok(column)
}
}
};
}
gen_columns!(i8, Int8Vector, v, v as i32);
gen_columns!(i16, Int16Vector, v, v as i32);
gen_columns!(i32, Int32Vector, v, v as i32);
gen_columns!(i64, Int64Vector, v, v as i64);
gen_columns!(u8, UInt8Vector, v, v as u32);
gen_columns!(u16, UInt16Vector, v, v as u32);
gen_columns!(u32, UInt32Vector, v, v as u32);
gen_columns!(u64, UInt64Vector, v, v as u64);
gen_columns!(f32, Float32Vector, v, v);
gen_columns!(f64, Float64Vector, v, v);
gen_columns!(bool, BooleanVector, v, v);
gen_columns!(binary, BinaryVector, v, v.to_vec());
gen_columns!(string, StringVector, v, v.to_string());
gen_columns!(date, DateVector, v, v.val());
gen_columns!(datetime, DateTimeVector, v, v.val());
gen_columns!(ts_second, TimestampSecondVector, v, v.into());
gen_columns!(ts_millisecond, TimestampMillisecondVector, v, v.into());
gen_columns!(ts_microsecond, TimestampMicrosecondVector, v, v.into());
gen_columns!(ts_nanosecond, TimestampNanosecondVector, v, v.into());
#[macro_export]
macro_rules! gen_put_data {
($key: tt, $builder_type: ty, $vari: ident, $cast: expr) => {
paste! {
pub fn [<gen_put_data_ $key>](column: Column) -> Result<VectorRef> {
let values = column.values.context(EmptyColumnValuesSnafu {})?;
let mut vector_iter = values.[<$key _values>].iter();
let num_rows = column.num_rows as usize;
let mut builder = <$builder_type>::with_capacity(num_rows);
if column.value_null_mask.is_empty() {
(0..num_rows)
.for_each(|_| builder.push(vector_iter.next().map(|$vari| $cast)));
} else {
BitVec::from_vec(column.value_null_mask)
.into_iter()
.take(num_rows)
.for_each(|is_null| {
if is_null {
builder.push(None);
} else {
builder.push(vector_iter.next().map(|$vari| $cast));
}
});
}
Ok(Arc::new(builder.finish()))
}
}
};
}
gen_put_data!(i8, Int8VectorBuilder, v, *v as i8);
gen_put_data!(i16, Int16VectorBuilder, v, *v as i16);
gen_put_data!(i32, Int32VectorBuilder, v, *v);
gen_put_data!(i64, Int64VectorBuilder, v, *v);
gen_put_data!(u8, UInt8VectorBuilder, v, *v as u8);
gen_put_data!(u16, UInt16VectorBuilder, v, *v as u16);
gen_put_data!(u32, UInt32VectorBuilder, v, *v as u32);
gen_put_data!(u64, UInt64VectorBuilder, v, *v as u64);
gen_put_data!(f32, Float32VectorBuilder, v, *v as f32);
gen_put_data!(f64, Float64VectorBuilder, v, *v as f64);
gen_put_data!(bool, BooleanVectorBuilder, v, *v);
gen_put_data!(binary, BinaryVectorBuilder, v, v.as_slice());
gen_put_data!(string, StringVectorBuilder, v, v.as_str());
gen_put_data!(date, DateVectorBuilder, v, (*v).into());
gen_put_data!(datetime, DateTimeVectorBuilder, v, (*v).into());
gen_put_data!(ts_second, TimestampSecondVectorBuilder, v, (*v).into());
gen_put_data!(
ts_millisecond,
TimestampMillisecondVectorBuilder,
v,
(*v).into()
);
gen_put_data!(
ts_microsecond,
TimestampMicrosecondVectorBuilder,
v,
(*v).into()
);
gen_put_data!(
ts_nanosecond,
TimestampNanosecondVectorBuilder,
v,
(*v).into()
);
pub fn gen_columns(vector: &VectorRef) -> Result<Column> {
let data_type = vector.data_type();
match data_type {
ConcreteDataType::Boolean(_) => gen_columns_bool(vector),
ConcreteDataType::Int8(_) => gen_columns_i8(vector),
ConcreteDataType::Int16(_) => gen_columns_i16(vector),
ConcreteDataType::Int32(_) => gen_columns_i32(vector),
ConcreteDataType::Int64(_) => gen_columns_i64(vector),
ConcreteDataType::UInt8(_) => gen_columns_u8(vector),
ConcreteDataType::UInt16(_) => gen_columns_u16(vector),
ConcreteDataType::UInt32(_) => gen_columns_u32(vector),
ConcreteDataType::UInt64(_) => gen_columns_u64(vector),
ConcreteDataType::Float32(_) => gen_columns_f32(vector),
ConcreteDataType::Float64(_) => gen_columns_f64(vector),
ConcreteDataType::Binary(_) => gen_columns_binary(vector),
ConcreteDataType::String(_) => gen_columns_string(vector),
ConcreteDataType::Date(_) => gen_columns_date(vector),
ConcreteDataType::DateTime(_) => gen_columns_datetime(vector),
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => gen_columns_ts_second(vector),
TimestampType::Millisecond(_) => gen_columns_ts_millisecond(vector),
TimestampType::Microsecond(_) => gen_columns_ts_microsecond(vector),
TimestampType::Nanosecond(_) => gen_columns_ts_nanosecond(vector),
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
// TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
unimplemented!("data type {:?} is not supported", data_type)
}
}
}
pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Result<VectorRef> {
match data_type {
ConcreteDataType::Boolean(_) => gen_put_data_bool(column),
ConcreteDataType::Int8(_) => gen_put_data_i8(column),
ConcreteDataType::Int16(_) => gen_put_data_i16(column),
ConcreteDataType::Int32(_) => gen_put_data_i32(column),
ConcreteDataType::Int64(_) => gen_put_data_i64(column),
ConcreteDataType::UInt8(_) => gen_put_data_u8(column),
ConcreteDataType::UInt16(_) => gen_put_data_u16(column),
ConcreteDataType::UInt32(_) => gen_put_data_u32(column),
ConcreteDataType::UInt64(_) => gen_put_data_u64(column),
ConcreteDataType::Float32(_) => gen_put_data_f32(column),
ConcreteDataType::Float64(_) => gen_put_data_f64(column),
ConcreteDataType::Binary(_) => gen_put_data_binary(column),
ConcreteDataType::String(_) => gen_put_data_string(column),
ConcreteDataType::Date(_) => gen_put_data_date(column),
ConcreteDataType::DateTime(_) => gen_put_data_datetime(column),
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => gen_put_data_ts_second(column),
TimestampType::Millisecond(_) => gen_put_data_ts_millisecond(column),
TimestampType::Microsecond(_) => gen_put_data_ts_microsecond(column),
TimestampType::Nanosecond(_) => gen_put_data_ts_nanosecond(column),
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
// TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
unimplemented!("data type {:?} is not supported", data_type)
}
}
}

View File

@@ -19,17 +19,19 @@ mod basic;
mod flush;
mod projection;
use std::collections::HashMap;
use common_telemetry::logging;
use datatypes::prelude::{ScalarVector, WrapperType};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use log_store::fs::noop::NoopLogStore;
use object_store::backend::fs;
use object_store::ObjectStore;
use store_api::storage::{
consts, Chunk, ChunkReader, PutOperation, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
consts, Chunk, ChunkReader, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
};
use tempdir::TempDir;
@@ -39,7 +41,6 @@ use crate::manifest::test_utils::*;
use crate::memtable::DefaultMemtableBuilder;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::{self, config_util, schema_util, write_batch_util};
use crate::write_batch::PutData;
/// Create metadata of a region with schema: (timestamp, v0).
pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata {
@@ -156,17 +157,18 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
}
}
fn new_put_data(data: &[(TimestampMillisecond, Option<i64>)]) -> PutData {
let mut put_data = PutData::with_num_columns(2);
fn new_put_data(data: &[(TimestampMillisecond, Option<i64>)]) -> HashMap<String, VectorRef> {
let mut put_data = HashMap::with_capacity(2);
let timestamps =
TimestampMillisecondVector::from_vec(data.iter().map(|v| v.0.into()).collect());
let values = Int64Vector::from_owned_iterator(data.iter().map(|kv| kv.1));
put_data
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps))
.unwrap();
put_data.add_value_column("v0", Arc::new(values)).unwrap();
put_data.insert(
test_util::TIMESTAMP_NAME.to_string(),
Arc::new(timestamps) as VectorRef,
);
put_data.insert("v0".to_string(), Arc::new(values) as VectorRef);
put_data
}

View File

@@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use datatypes::prelude::*;
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
ColumnDescriptorBuilder, ColumnId, PutOperation, Region, RegionMeta, ScanRequest, SchemaRef,
Snapshot, WriteRequest, WriteResponse,
ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteRequest, WriteResponse,
};
use tempdir::TempDir;
@@ -31,7 +31,6 @@ use crate::region::{OpenOptions, RawRegionMetadata, RegionImpl, RegionMetadata};
use crate::test_util;
use crate::test_util::config_util;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::write_batch::PutData;
const REGION_NAME: &str = "region-alter-0";
@@ -69,8 +68,8 @@ impl DataRow {
}
}
fn new_put_data(data: &[DataRow]) -> PutData {
let mut put_data = PutData::with_num_columns(4);
fn new_put_data(data: &[DataRow]) -> HashMap<String, VectorRef> {
let mut put_data = HashMap::with_capacity(4);
let keys = Int64Vector::from(data.iter().map(|v| v.key).collect::<Vec<_>>());
let timestamps = TimestampMillisecondVector::from(
data.iter()
@@ -80,13 +79,14 @@ fn new_put_data(data: &[DataRow]) -> PutData {
let values1 = Int64Vector::from(data.iter().map(|kv| kv.v0).collect::<Vec<_>>());
let values2 = Int64Vector::from(data.iter().map(|kv| kv.v1).collect::<Vec<_>>());
put_data.add_key_column("k0", Arc::new(keys)).unwrap();
put_data
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps))
.unwrap();
put_data.insert("k0".to_string(), Arc::new(keys) as VectorRef);
put_data.insert(
test_util::TIMESTAMP_NAME.to_string(),
Arc::new(timestamps) as VectorRef,
);
put_data.add_value_column("v0", Arc::new(values1)).unwrap();
put_data.add_value_column("v1", Arc::new(values2)).unwrap();
put_data.insert("v0".to_string(), Arc::new(values1) as VectorRef);
put_data.insert("v1".to_string(), Arc::new(values2) as VectorRef);
put_data
}

View File

@@ -12,23 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::fs::log::LocalFileLogStore;
use store_api::logstore::LogStore;
use store_api::storage::{
Chunk, ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext,
WriteRequest,
Chunk, ChunkReader, ReadContext, Region, ScanRequest, Snapshot, WriteContext, WriteRequest,
};
use tempdir::TempDir;
use crate::region::{RegionImpl, RegionMetadata};
use crate::test_util::{self, config_util, descriptor_util, write_batch_util};
use crate::write_batch::{PutData, WriteBatch};
use crate::write_batch::WriteBatch;
/// Create metadata with schema (k0, timestamp, v0, v1)
fn new_metadata(region_name: &str) -> RegionMetadata {
@@ -60,20 +60,31 @@ fn new_write_batch_for_test() -> WriteBatch {
/// v0: [initial_value, ...., initial_value]
/// v1: [initial_value, ..., initial_value + len - 1]
/// ```
fn new_put_data(len: usize, key_start: i64, ts_start: i64, initial_value: i64) -> PutData {
let mut put_data = PutData::with_num_columns(4);
fn new_put_data(
len: usize,
key_start: i64,
ts_start: i64,
initial_value: i64,
) -> HashMap<String, VectorRef> {
let mut put_data = HashMap::with_capacity(4);
let k0 = Int64Vector::from_values((0..len).map(|v| key_start + v as i64));
let ts = TimestampMillisecondVector::from_values((0..len).map(|v| ts_start + v as i64));
let v0 = Int64Vector::from_values(std::iter::repeat(initial_value).take(len));
let v1 = Int64Vector::from_values((0..len).map(|v| initial_value + v as i64));
let k0 = Arc::new(Int64Vector::from_values(
(0..len).map(|v| key_start + v as i64),
)) as VectorRef;
let ts = Arc::new(TimestampMillisecondVector::from_values(
(0..len).map(|v| ts_start + v as i64),
)) as VectorRef;
let v0 = Arc::new(Int64Vector::from_values(
std::iter::repeat(initial_value).take(len),
)) as VectorRef;
let v1 = Arc::new(Int64Vector::from_values(
(0..len).map(|v| initial_value + v as i64),
)) as VectorRef;
put_data.add_key_column("k0", Arc::new(k0)).unwrap();
put_data
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(ts))
.unwrap();
put_data.add_value_column("v0", Arc::new(v0)).unwrap();
put_data.add_value_column("v1", Arc::new(v1)).unwrap();
put_data.insert("k0".to_string(), k0);
put_data.insert(test_util::TIMESTAMP_NAME.to_string(), ts);
put_data.insert("v0".to_string(), v0);
put_data.insert("v1".to_string(), v1);
put_data
}

View File

@@ -35,7 +35,7 @@ use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, Shar
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::version::{VersionControl, VersionControlRef, VersionEdit};
use crate::wal::{Payload, Wal};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
pub type RegionWriterRef = Arc<RegionWriter>;
@@ -216,8 +216,7 @@ impl RegionWriter {
version_control.set_committed_sequence(next_sequence);
let header = WalHeader::with_last_manifest_version(manifest_version);
wal.write_to_wal(next_sequence, header, Payload::None)
.await?;
wal.write_to_wal(next_sequence, header, None).await?;
Ok(())
}
@@ -311,16 +310,12 @@ impl WriterInner {
let wal_header = WalHeader::with_last_manifest_version(version.manifest_version());
writer_ctx
.wal
.write_to_wal(
next_sequence,
wal_header,
Payload::WriteBatchArrow(&request),
)
.write_to_wal(next_sequence, wal_header, Some(request.payload()))
.await?;
// Insert batch into memtable.
let mut inserter = Inserter::new(next_sequence);
inserter.insert_memtable(&request, version.mutable_memtable())?;
inserter.insert_memtable(request.payload(), version.mutable_memtable())?;
// Update committed_sequence to make current batch visible. The `&mut self` of WriterInner
// guarantees the writer is exclusive.
@@ -350,7 +345,7 @@ impl WriterInner {
// Read starts from the first entry after last flushed entry, so the start sequence
// should be flushed_sequence + 1.
let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence + 1).await?;
while let Some((req_sequence, _header, request)) = stream.try_next().await? {
while let Some((req_sequence, _header, payload)) = stream.try_next().await? {
while let Some((sequence_before_alter, _)) = next_apply_metadata {
// There might be multiple metadata changes to be applied, so a loop is necessary.
if req_sequence > sequence_before_alter {
@@ -370,7 +365,7 @@ impl WriterInner {
}
}
if let Some(request) = request {
if let Some(payload) = payload {
num_requests += 1;
// Note that memtables of `Version` may be updated during replay.
let version = version_control.current();
@@ -398,7 +393,7 @@ impl WriterInner {
// TODO(yingwen): Trigger flush if the size of memtables reach the flush threshold to avoid
// out of memory during replay, but we need to do it carefully to avoid dead lock.
let mut inserter = Inserter::new(last_sequence);
inserter.insert_memtable(&request, version.mutable_memtable())?;
inserter.insert_memtable(&payload, version.mutable_memtable())?;
}
}

View File

@@ -121,6 +121,11 @@ impl RegionSchema {
self.store_schema.row_key_indices()
}
#[inline]
pub(crate) fn value_indices(&self) -> impl Iterator<Item = usize> {
self.store_schema.value_indices()
}
#[inline]
pub(crate) fn column_metadata(&self, idx: usize) -> &ColumnMetadata {
self.columns.column_metadata(idx)

View File

@@ -151,6 +151,11 @@ impl StoreSchema {
0..self.row_key_end
}
#[inline]
pub(crate) fn value_indices(&self) -> impl Iterator<Item = usize> {
self.row_key_end..self.user_column_end
}
#[inline]
pub(crate) fn column_name(&self, idx: usize) -> &str {
&self.schema.column_schemas()[idx].name
@@ -288,6 +293,8 @@ mod tests {
assert_eq!(4, store_schema.op_type_index());
let row_key_indices: Vec<_> = store_schema.row_key_indices().collect();
assert_eq!([0, 1], &row_key_indices[..]);
let value_indices: Vec<_> = store_schema.value_indices().collect();
assert_eq!([2], &value_indices[..]);
// Test batch and chunk conversion.
let batch = tests::new_batch();

View File

@@ -25,12 +25,9 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::codec::{Decoder, Encoder};
use crate::error::{self, Error, MarkWalStableSnafu, Result};
use crate::proto::wal::{self, PayloadType, WalHeader};
use crate::write_batch::codec::{
WriteBatchArrowDecoder, WriteBatchArrowEncoder, WriteBatchProtobufDecoder,
WriteBatchProtobufEncoder,
};
use crate::write_batch::WriteBatch;
use crate::proto::wal::{self, WalHeader};
use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder};
use crate::write_batch::Payload;
#[derive(Debug)]
pub struct Wal<S: LogStore> {
@@ -39,9 +36,8 @@ pub struct Wal<S: LogStore> {
store: Arc<S>,
}
pub type WriteBatchStream<'a> = Pin<
Box<dyn Stream<Item = Result<(SequenceNumber, WalHeader, Option<WriteBatch>)>> + Send + 'a>,
>;
pub type PayloadStream<'a> =
Pin<Box<dyn Stream<Item = Result<(SequenceNumber, WalHeader, Option<Payload>)>> + Send + 'a>>;
// Wal should be cheap to clone, so avoid holding things like String, Vec.
impl<S: LogStore> Clone for Wal<S> {
@@ -85,12 +81,12 @@ impl<S: LogStore> Wal<S> {
///
/// ```text
/// | |
/// |--------------------------> Header Len <-----------------------------| Arrow/Protobuf/... encoded
/// |--------------------------> Header Len <-----------------------------| Arrow IPC format
/// | |
/// v v
/// +---------------------+----------------------------------------------------+--------------+-------------+--------------+
/// | | Header | | | |
/// | Header Len(varint) | (last_manifest_version + mutation_types + ...) | Data Chunk0 | Data Chunk1 | ... |
/// | Header Len(varint) | (last_manifest_version + mutation_types + ...) | Payload 0 | Payload 1 | ... |
/// | | | | | |
/// +---------------------+----------------------------------------------------+--------------+-------------+--------------+
/// ```
@@ -99,35 +95,24 @@ impl<S: LogStore> Wal<S> {
&self,
seq: SequenceNumber,
mut header: WalHeader,
payload: Payload<'_>,
payload: Option<&Payload>,
) -> Result<(u64, usize)> {
header.payload_type = payload.payload_type();
if let Payload::WriteBatchArrow(batch) = payload {
header.mutation_types = wal::gen_mutation_types(batch);
if let Some(p) = payload {
header.mutation_types = wal::gen_mutation_types(p);
}
let mut buf = vec![];
// header
// Encode header
let wal_header_encoder = WalHeaderEncoder {};
wal_header_encoder.encode(&header, &mut buf)?;
if let Payload::WriteBatchArrow(batch) = payload {
// entry
let encoder = WriteBatchArrowEncoder::new();
// Encode payload
if let Some(p) = payload {
let encoder = PayloadEncoder::new();
// TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf.
encoder
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
region_id: self.region_id(),
})?;
} else if let Payload::WriteBatchProto(batch) = payload {
// entry
let encoder = WriteBatchProtobufEncoder {};
// TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf.
encoder
.encode(batch, &mut buf)
.encode(p, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
region_id: self.region_id(),
@@ -138,7 +123,7 @@ impl<S: LogStore> Wal<S> {
self.write(seq, &buf).await
}
pub async fn read_from_wal(&self, start_seq: SequenceNumber) -> Result<WriteBatchStream<'_>> {
pub async fn read_from_wal(&self, start_seq: SequenceNumber) -> Result<PayloadStream<'_>> {
let stream = self
.store
.read(&self.namespace, start_seq)
@@ -180,12 +165,12 @@ impl<S: LogStore> Wal<S> {
fn decode_entry<E: Entry>(
&self,
entry: E,
) -> Result<(SequenceNumber, WalHeader, Option<WriteBatch>)> {
) -> Result<(SequenceNumber, WalHeader, Option<Payload>)> {
let seq_num = entry.id();
let input = entry.data();
let wal_header_decoder = WalHeaderDecoder {};
let (data_pos, mut header) = wal_header_decoder.decode(input)?;
let (data_pos, header) = wal_header_decoder.decode(input)?;
ensure!(
data_pos <= input.len(),
@@ -199,55 +184,19 @@ impl<S: LogStore> Wal<S> {
}
);
match PayloadType::from_i32(header.payload_type) {
Some(PayloadType::None) => Ok((seq_num, header, None)),
Some(PayloadType::WriteBatchArrow) => {
let mutation_types = std::mem::take(&mut header.mutation_types);
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
region_id: self.region_id(),
})?;
if header.mutation_types.is_empty() {
return Ok((seq_num, header, None));
}
Ok((seq_num, header, Some(write_batch)))
}
Some(PayloadType::WriteBatchProto) => {
let mutation_types = std::mem::take(&mut header.mutation_types);
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
region_id: self.region_id(),
})?;
Ok((seq_num, header, Some(write_batch)))
}
_ => error::WalDataCorruptedSnafu {
let decoder = PayloadDecoder::new(&header.mutation_types);
let payload = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
region_id: self.region_id(),
message: format!("invalid payload type={}", header.payload_type),
}
.fail(),
}
}
}
})?;
pub enum Payload<'a> {
None, // only header
WriteBatchArrow(&'a WriteBatch),
#[allow(dead_code)]
WriteBatchProto(&'a WriteBatch),
}
impl<'a> Payload<'a> {
pub fn payload_type(&self) -> i32 {
match self {
Payload::None => PayloadType::None.into(),
Payload::WriteBatchArrow(_) => PayloadType::WriteBatchArrow.into(),
Payload::WriteBatchProto(_) => PayloadType::WriteBatchProto.into(),
}
Ok((seq_num, header, Some(payload)))
}
}
@@ -314,7 +263,7 @@ mod tests {
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new(0, Arc::new(log_store));
let header = WalHeader::with_last_manifest_version(111);
let (seq_num, _) = wal.write_to_wal(3, header, Payload::None).await?;
let (seq_num, _) = wal.write_to_wal(3, header, None).await?;
assert_eq!(3, seq_num);
@@ -335,7 +284,6 @@ mod tests {
#[test]
pub fn test_wal_header_codec() {
let wal_header = WalHeader {
payload_type: 1,
last_manifest_version: 99999999,
mutation_types: vec![],
};

View File

@@ -15,420 +15,162 @@
pub mod codec;
mod compat;
use std::any::Any;
use std::collections::{BTreeSet, HashMap};
use std::slice;
use std::time::Duration;
use std::collections::HashMap;
use common_error::prelude::*;
use common_time::timestamp_millis::BucketAligned;
use common_time::RangeMillis;
use datatypes::arrow::error::ArrowError;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::{ScalarVector, Value};
use common_recordbatch::RecordBatch;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use prost::{DecodeError, EncodeError};
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{consts, PutOperation, WriteRequest};
use store_api::storage::{OpType, WriteRequest};
use crate::proto;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Duplicate column {} in same request", name))]
DuplicateColumn { name: String, backtrace: Backtrace },
#[snafu(display("Missing column {} in request", name))]
MissingColumn { name: String, backtrace: Backtrace },
#[snafu(display(
"Type of column {} does not match type in schema, expect {:?}, given {:?}",
name,
expect,
given
))]
TypeMismatch {
name: String,
expect: ConcreteDataType,
given: ConcreteDataType,
backtrace: Backtrace,
},
#[snafu(display("Column {} is not null but input has null", name))]
HasNull { name: String, backtrace: Backtrace },
#[snafu(display("Unknown column {}", name))]
UnknownColumn { name: String, backtrace: Backtrace },
#[snafu(display(
"Length of column {} not equals to other columns, expect {}, given {}",
name,
expect,
given
))]
LenNotEquals {
name: String,
expect: usize,
given: usize,
backtrace: Backtrace,
},
#[snafu(display(
"Request is too large, max is {}, current is {}",
MAX_BATCH_SIZE,
num_rows
))]
RequestTooLarge {
num_rows: usize,
backtrace: Backtrace,
},
#[snafu(display("Cannot align timestamp: {}", ts))]
TimestampOverflow { ts: i64 },
#[snafu(display("Failed to encode, source: {}", source))]
EncodeArrow {
backtrace: Backtrace,
source: ArrowError,
},
#[snafu(display("Failed to decode, source: {}", source))]
DecodeArrow {
backtrace: Backtrace,
source: ArrowError,
},
#[snafu(display("Failed to encode into protobuf, source: {}", source))]
EncodeProtobuf {
backtrace: Backtrace,
source: EncodeError,
},
#[snafu(display("Failed to decode from protobuf, source: {}", source))]
DecodeProtobuf {
backtrace: Backtrace,
source: DecodeError,
},
#[snafu(display("Failed to parse schema, source: {}", source))]
ParseSchema {
backtrace: Backtrace,
source: datatypes::error::Error,
},
#[snafu(display("Failed to decode, corrupted data {}", message))]
DataCorrupted {
message: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode vector, source {}", source))]
DecodeVector {
backtrace: Backtrace,
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert into protobuf struct, source {}", source))]
ToProtobuf {
source: proto::write_batch::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert from protobuf struct, source {}", source))]
FromProtobuf {
source: proto::write_batch::Error,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to create default value for column {}, source: {}",
name,
source
))]
CreateDefault {
name: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
use crate::error::{
BatchMissingColumnSnafu, CreateDefaultSnafu, CreateRecordBatchSnafu, Error, HasNullSnafu,
LenNotEqualsSnafu, RequestTooLargeSnafu, Result, TypeMismatchSnafu, UnknownColumnSnafu,
};
/// Max number of updates of a write batch.
const MAX_BATCH_SIZE: usize = 1_000_000;
pub(crate) const MAX_BATCH_SIZE: usize = 1_000_000;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
/// Data of [WriteBatch].
///
/// We serialize this struct to the WAL instead of the whole `WriteBatch` to avoid
/// storing unnecessary information.
#[derive(Debug, PartialEq)]
pub struct Payload {
/// Schema of the payload.
///
/// This schema doesn't contain internal columns.
pub schema: SchemaRef,
pub mutations: Vec<Mutation>,
}
impl Payload {
/// Creates a new payload with given `schema`.
fn new(schema: SchemaRef) -> Payload {
Payload {
schema,
mutations: Vec::new(),
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
/// Returns true if there is no mutation in the payload.
#[inline]
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
}
fn as_any(&self) -> &dyn Any {
self
}
/// A write operation to the region.
#[derive(Debug, PartialEq)]
pub struct Mutation {
/// Type of the mutation.
pub op_type: OpType,
/// Data of the mutation.
pub record_batch: RecordBatch,
}
/// Implementation of [WriteRequest].
#[derive(Debug)]
pub struct WriteBatch {
schema: SchemaRef,
mutations: Vec<Mutation>,
num_rows: usize,
payload: Payload,
/// Number of rows this batch need to mutate (put, delete, etc).
///
/// We use it to check whether this batch is too large.
num_rows_to_mutate: usize,
}
impl WriteRequest for WriteBatch {
type Error = Error;
type PutOp = PutData;
fn put(&mut self, mut data: PutData) -> Result<()> {
fn put(&mut self, data: HashMap<String, VectorRef>) -> Result<()> {
let data = NameToVector::new(data)?;
if data.is_empty() {
return Ok(());
}
self.preprocess_put_data(&mut data)?;
let record_batch = self.process_put_data(data)?;
self.add_num_rows(data.num_rows())?;
self.mutations.push(Mutation::Put(data));
self.add_num_rows_to_mutate(record_batch.num_rows())?;
self.payload.mutations.push(Mutation {
op_type: OpType::Put,
record_batch,
});
Ok(())
}
/// Aligns timestamps in write batch specified by schema to durations.
///
/// A negative timestamp means "before Unix epoch".
/// Valid timestamp range is `[i64::MIN + duration, i64::MAX-(i64::MAX%duration))`.
fn time_ranges(&self, duration: Duration) -> Result<Vec<RangeMillis>> {
let ts_col_name = match self.schema.timestamp_column() {
None => {
// write batch does not have a timestamp column
return Ok(Vec::new());
}
Some(ts_col) => &ts_col.name,
};
let durations_millis = duration.as_millis() as i64;
let mut aligned_timestamps: BTreeSet<i64> = BTreeSet::new();
for m in &self.mutations {
match m {
Mutation::Put(put_data) => {
let column = put_data
.column_by_name(ts_col_name)
.unwrap_or_else(|| panic!("Cannot find column by name: {ts_col_name}"));
if column.is_const() {
let ts = match column.get(0) {
Value::Timestamp(ts) => ts,
_ => unreachable!(),
};
let aligned = align_timestamp(ts.value(), durations_millis)
.context(TimestampOverflowSnafu { ts: ts.value() })?;
aligned_timestamps.insert(aligned);
} else {
match column.data_type() {
ConcreteDataType::Timestamp(_) => {
let ts_vector = column
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts.into(), durations_millis)
.context(TimestampOverflowSnafu { ts: i64::from(ts) })?;
aligned_timestamps.insert(aligned);
}
}
ConcreteDataType::Int64(_) => {
let ts_vector =
column.as_any().downcast_ref::<Int64Vector>().unwrap();
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts, durations_millis)
.context(TimestampOverflowSnafu { ts })?;
aligned_timestamps.insert(aligned);
}
}
_ => unreachable!(),
}
}
}
}
}
let ranges = aligned_timestamps
.iter()
.map(|t| RangeMillis::new(*t, *t + durations_millis).unwrap())
.collect::<Vec<_>>();
Ok(ranges)
}
fn put_op(&self) -> Self::PutOp {
PutData::new()
}
fn put_op_with_columns(num_columns: usize) -> Self::PutOp {
PutData::with_num_columns(num_columns)
}
}
/// Aligns timestamp to nearest time interval.
/// Negative ts means a timestamp before Unix epoch.
/// If arithmetic overflows, this function returns None.
/// So timestamp within `[i64::MIN, i64::MIN + duration)` or
/// `[i64::MAX-(i64::MAX%duration), i64::MAX]` is not a valid input.
fn align_timestamp(ts: i64, duration: i64) -> Option<i64> {
let aligned = ts.align_by_bucket(duration)?.as_i64();
// Also ensure end timestamp won't overflow.
aligned.checked_add(duration)?;
Some(aligned)
}
// WriteBatch pub methods.
impl WriteBatch {
pub fn new(schema: SchemaRef) -> Self {
Self {
schema,
mutations: Vec::new(),
num_rows: 0,
payload: Payload::new(schema),
num_rows_to_mutate: 0,
}
}
#[inline]
pub fn schema(&self) -> &SchemaRef {
&self.schema
&self.payload.schema
}
pub fn iter(&self) -> slice::Iter<'_, Mutation> {
self.mutations.iter()
}
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
#[inline]
pub fn payload(&self) -> &Payload {
&self.payload
}
}
/// Enum to wrap different operations.
pub enum Mutation {
Put(PutData),
}
impl WriteBatch {
/// Validates `data` and converts it into a [RecordBatch].
///
/// It fills missing columns by schema's default values.
fn process_put_data(&self, data: NameToVector) -> Result<RecordBatch> {
let num_rows = data.num_rows();
let mut columns = Vec::with_capacity(self.schema().num_columns());
#[derive(Default, Debug)]
pub struct PutData {
columns: HashMap<String, VectorRef>,
}
impl PutData {
pub(crate) fn new() -> PutData {
PutData::default()
}
pub(crate) fn with_num_columns(num_columns: usize) -> PutData {
PutData {
columns: HashMap::with_capacity(num_columns),
}
}
fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> {
ensure!(
!self.columns.contains_key(name),
DuplicateColumnSnafu { name }
);
if let Some(col) = self.columns.values().next() {
ensure!(
col.len() == vector.len(),
LenNotEqualsSnafu {
name,
expect: col.len(),
given: vector.len(),
for column_schema in self.schema().column_schemas() {
match data.0.get(&column_schema.name) {
Some(col) => {
validate_column(column_schema, col)?;
columns.push(col.clone());
}
None => {
// If column is not provided, fills it by default value.
let col = new_column_with_default_value(column_schema, num_rows)?;
columns.push(col);
}
}
}
// Check all columns in data also exists in schema, which means we
// are not inserting unknown columns.
for name in data.0.keys() {
ensure!(
self.schema().contains_column(name),
UnknownColumnSnafu { name }
);
}
self.columns.insert(name.to_string(), vector);
RecordBatch::new(self.schema().clone(), columns).context(CreateRecordBatchSnafu)
}
fn add_num_rows_to_mutate(&mut self, len: usize) -> Result<()> {
let num_rows = self.num_rows_to_mutate + len;
ensure!(
num_rows <= MAX_BATCH_SIZE,
RequestTooLargeSnafu { num_rows }
);
self.num_rows_to_mutate = num_rows;
Ok(())
}
/// Add columns by its default value.
fn add_default_by_name(&mut self, column_schema: &ColumnSchema) -> Result<()> {
let num_rows = self.num_rows();
// If column is not provided, fills it by default value.
let vector = column_schema
.create_default_vector(num_rows)
.context(CreateDefaultSnafu {
name: &column_schema.name,
})?
.context(MissingColumnSnafu {
name: &column_schema.name,
})?;
validate_column(column_schema, &vector)?;
self.add_column_by_name(&column_schema.name, vector)
}
}
impl PutOperation for PutData {
type Error = Error;
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<()> {
self.add_column_by_name(name, vector)
}
fn add_version_column(&mut self, vector: VectorRef) -> Result<()> {
self.add_column_by_name(consts::VERSION_COLUMN_NAME, vector)
}
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<()> {
self.add_column_by_name(name, vector)
}
}
// PutData pub methods.
impl PutData {
pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> {
self.columns.get(name)
}
/// Returns number of columns in data.
pub fn num_columns(&self) -> usize {
self.columns.len()
}
/// Returns number of rows in data.
pub fn num_rows(&self) -> usize {
self.columns
.values()
.next()
.map(|col| col.len())
.unwrap_or(0)
}
/// Returns true if no rows in data.
///
/// `PutData` with empty column will also be considered as empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
/// Returns slice of [PutData] in range `[start, end)`.
///
/// # Panics
/// Panics if `start > end`.
pub fn slice(&self, start: usize, end: usize) -> PutData {
assert!(start <= end);
let columns = self
.columns
.iter()
.map(|(k, v)| (k.clone(), v.slice(start, end - start)))
.collect();
PutData { columns }
}
/// Returns the length of the first vector in `data`.
fn first_vector_len(data: &HashMap<String, VectorRef>) -> usize {
data.values().next().map(|col| col.len()).unwrap_or(0)
}
/// Checks whether `col` matches given `column_schema`.
fn validate_column(column_schema: &ColumnSchema, col: &VectorRef) -> Result<()> {
if !col.data_type().is_null() {
// This allow us to use NullVector for columns that only have null value.
@@ -454,55 +196,63 @@ fn validate_column(column_schema: &ColumnSchema, col: &VectorRef) -> Result<()>
Ok(())
}
impl WriteBatch {
/// Validate [PutData] and fill missing columns by default value.
fn preprocess_put_data(&self, data: &mut PutData) -> Result<()> {
for column_schema in self.schema.column_schemas() {
match data.column_by_name(&column_schema.name) {
Some(col) => {
validate_column(column_schema, col)?;
}
None => {
// If column is not provided, fills it by default value.
data.add_default_by_name(column_schema)?;
}
}
}
/// Creates a new column and fills it by default value.
///
/// `num_rows` MUST be greater than 0. This function will also validate the schema.
pub(crate) fn new_column_with_default_value(
column_schema: &ColumnSchema,
num_rows: usize,
) -> Result<VectorRef> {
// If column is not provided, fills it by default value.
let vector = column_schema
.create_default_vector(num_rows)
.context(CreateDefaultSnafu {
name: &column_schema.name,
})?
.context(BatchMissingColumnSnafu {
column: &column_schema.name,
})?;
// Check all columns in data also exists in schema.
for name in data.columns.keys() {
validate_column(column_schema, &vector)?;
Ok(vector)
}
/// Vectors in [NameToVector] have same length.
///
/// MUST construct it via [`NameToVector::new()`] to ensure the vector lengths are validated.
struct NameToVector(HashMap<String, VectorRef>);
impl NameToVector {
fn new(data: HashMap<String, VectorRef>) -> Result<NameToVector> {
let num_rows = first_vector_len(&data);
for (name, vector) in &data {
ensure!(
self.schema.column_schema_by_name(name).is_some(),
UnknownColumnSnafu { name }
num_rows == vector.len(),
LenNotEqualsSnafu {
name,
expect: num_rows,
given: vector.len(),
}
);
}
Ok(())
Ok(NameToVector(data))
}
fn add_num_rows(&mut self, len: usize) -> Result<()> {
let num_rows = self.num_rows + len;
ensure!(
num_rows <= MAX_BATCH_SIZE,
RequestTooLargeSnafu { num_rows }
);
self.num_rows = num_rows;
Ok(())
fn num_rows(&self) -> usize {
first_vector_len(&self.0)
}
}
impl<'a> IntoIterator for &'a WriteBatch {
type Item = &'a Mutation;
type IntoIter = slice::Iter<'a, Mutation>;
fn into_iter(self) -> slice::Iter<'a, Mutation> {
self.iter()
fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}
#[cfg(test)]
pub(crate) fn new_test_batch() -> WriteBatch {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::consts;
use crate::test_util::write_batch_util;
@@ -522,251 +272,165 @@ mod tests {
use std::iter;
use std::sync::Arc;
use common_error::prelude::*;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{
BooleanVector, ConstantVector, Int32Vector, Int64Vector, TimestampMillisecondVector,
UInt64Vector,
BooleanVector, Int32Vector, Int64Vector, TimestampMillisecondVector, UInt64Vector,
};
use store_api::storage::consts;
use super::*;
use crate::test_util::write_batch_util;
#[test]
fn test_put_data_basic() {
let mut put_data = PutData::new();
assert!(put_data.is_empty());
fn test_name_to_vector_basic() {
let columns = NameToVector::new(HashMap::new()).unwrap();
assert!(columns.is_empty());
let vector1 = Arc::new(Int32Vector::from_slice(&[1, 2, 3, 4, 5]));
let vector2 = Arc::new(UInt64Vector::from_slice(&[0, 2, 4, 6, 8]));
let vector1 = Arc::new(Int32Vector::from_slice(&[1, 2, 3, 4, 5])) as VectorRef;
let vector2 = Arc::new(UInt64Vector::from_slice(&[0, 2, 4, 6, 8])) as VectorRef;
put_data.add_key_column("k1", vector1.clone()).unwrap();
put_data.add_version_column(vector2).unwrap();
put_data.add_value_column("v1", vector1).unwrap();
let mut put_data = HashMap::with_capacity(3);
put_data.insert("k1".to_string(), vector1.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), vector2);
put_data.insert("v1".to_string(), vector1);
assert_eq!(5, put_data.num_rows());
assert!(!put_data.is_empty());
assert!(put_data.column_by_name("no such column").is_none());
assert!(put_data.column_by_name("k1").is_some());
assert!(put_data.column_by_name("v1").is_some());
assert!(put_data
.column_by_name(consts::VERSION_COLUMN_NAME)
.is_some());
let columns = NameToVector::new(put_data).unwrap();
assert_eq!(5, columns.num_rows());
assert!(!columns.is_empty());
}
#[test]
fn test_put_data_empty_vector() {
let mut put_data = PutData::with_num_columns(1);
assert!(put_data.is_empty());
fn test_name_to_vector_empty_vector() {
let vector1 = Arc::new(Int32Vector::from_slice(&[])) as VectorRef;
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), vector1);
let vector1 = Arc::new(Int32Vector::from_slice(&[]));
put_data.add_key_column("k1", vector1).unwrap();
assert_eq!(0, put_data.num_rows());
assert!(put_data.is_empty());
let columns = NameToVector::new(put_data).unwrap();
assert_eq!(0, columns.num_rows());
assert!(columns.is_empty());
}
#[test]
fn test_write_batch_put() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::with_capacity(4);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
let mut batch = new_test_batch();
assert!(batch.is_empty());
assert!(batch.payload().is_empty());
batch.put(put_data).unwrap();
assert!(!batch.is_empty());
assert!(!batch.payload().is_empty());
let mut iter = batch.iter();
let Mutation::Put(put_data) = iter.next().unwrap();
assert_eq!(3, put_data.num_rows());
let mutation = &batch.payload().mutations[0];
assert_eq!(3, mutation.record_batch.num_rows());
}
fn check_err(err: Error, msg: &str) {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
assert!(err.to_string().contains(msg));
assert!(
err.to_string().contains(msg),
"<{err}> does not contain {msg}",
);
}
#[test]
fn test_write_batch_too_large() {
let boolv = Arc::new(BooleanVector::from_iterator(
iter::repeat(true).take(MAX_BATCH_SIZE + 1),
));
)) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", boolv).unwrap();
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), boolv);
let mut batch =
write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)], None);
let err = batch.put(put_data).err().unwrap();
let err = batch.put(put_data).unwrap_err();
check_err(err, "Request is too large");
}
#[test]
fn test_put_data_duplicate() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
let err = put_data.add_key_column("k1", intv).err().unwrap();
check_err(err, "Duplicate column k1");
}
#[test]
fn test_put_data_different_len() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![true, false]));
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0])) as VectorRef;
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv).unwrap();
let err = put_data.add_value_column("v1", boolv).err().unwrap();
check_err(err, "Length of column v1 not equals");
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv.clone());
put_data.insert("ts".to_string(), tsv);
let mut batch = new_test_batch();
let err = batch.put(put_data).unwrap_err();
check_err(err, "not equals to other columns");
}
#[test]
fn test_put_type_mismatch() {
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let tsv = Arc::new(Int64Vector::from_slice(&[0, 0, 0])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
let err = batch.put(put_data).unwrap_err();
check_err(err, "Type of column k1 does not match");
}
#[test]
fn test_put_type_has_null() {
let intv = Arc::new(UInt64Vector::from(vec![Some(1), None, Some(3)]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
let intv = Arc::new(UInt64Vector::from(vec![Some(1), None, Some(3)])) as VectorRef;
let tsv = Arc::new(Int64Vector::from_slice(&[0, 0, 0])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv);
put_data.insert("ts".to_string(), tsv);
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
let err = batch.put(put_data).unwrap_err();
check_err(err, "Column k1 is not null");
}
#[test]
fn test_put_missing_column() {
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let tsv = Arc::new(Int64Vector::from_slice(&[0, 0, 0])) as VectorRef;
let mut put_data = HashMap::new();
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
let mut put_data = PutData::new();
put_data.add_key_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
let err = batch.put(put_data).unwrap_err();
check_err(err, "Missing column k1");
}
#[test]
fn test_put_unknown_column() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice(&[0, 0, 0])) as VectorRef;
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv.clone()).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
put_data.add_value_column("v2", boolv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
check_err(err, "Unknown column v2");
}
#[test]
fn test_align_timestamp() {
let duration_millis = 20;
let ts = [-21, -20, -19, -1, 0, 5, 15, 19, 20, 21];
let res = ts.map(|t| align_timestamp(t, duration_millis));
assert_eq!(res, [-40, -20, -20, -20, 0, 0, 0, 0, 20, 20].map(Some));
}
#[test]
fn test_align_timestamp_overflow() {
assert_eq!(Some(i64::MIN), align_timestamp(i64::MIN, 1));
assert_eq!(Some(-9223372036854775808), align_timestamp(i64::MIN, 2));
assert_eq!(
Some(((i64::MIN + 20) / 20 - 1) * 20),
align_timestamp(i64::MIN + 20, 20)
);
assert_eq!(None, align_timestamp(i64::MAX - (i64::MAX % 23), 23));
assert_eq!(
Some(9223372036854775780),
align_timestamp(i64::MAX / 20 * 20 - 1, 20)
);
}
#[test]
fn test_write_batch_time_range() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![
-21, -20, -1, 0, 1, 20,
]));
let boolv = Arc::new(BooleanVector::from(vec![
true, false, true, false, false, false,
]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv.clone());
put_data.insert("ts".to_string(), tsv);
put_data.insert("v2".to_string(), boolv);
let mut batch = new_test_batch();
batch.put(put_data).unwrap();
let duration_millis = 20i64;
let ranges = batch
.time_ranges(Duration::from_millis(duration_millis as u64))
.unwrap();
assert_eq!(
[-40, -20, 0, 20].map(|v| RangeMillis::new(v, v + duration_millis).unwrap()),
ranges.as_slice()
)
}
#[test]
fn test_write_batch_time_range_const_vector() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
let tsv = Arc::new(ConstantVector::new(
Arc::new(TimestampMillisecondVector::from_vec(vec![20])),
6,
));
let boolv = Arc::new(BooleanVector::from(vec![
true, false, true, false, false, false,
]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut batch = new_test_batch();
batch.put(put_data).unwrap();
let duration_millis = 20i64;
let ranges = batch
.time_ranges(Duration::from_millis(duration_millis as u64))
.unwrap();
assert_eq!(
[20].map(|v| RangeMillis::new(v, v + duration_millis).unwrap()),
ranges.as_slice()
)
let err = batch.put(put_data).unwrap_err();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
}
}

View File

@@ -15,298 +15,150 @@
use std::io::Cursor;
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use datatypes::arrow::ipc::reader::StreamReader;
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::WriteRequest;
use datatypes::schema::Schema;
use snafu::{ensure, ResultExt};
use store_api::storage::OpType;
use crate::codec::{Decoder, Encoder};
use crate::proto::wal::MutationType;
use crate::proto::write_batch::{self, gen_columns, gen_put_data_vector};
use crate::write_batch::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeProtobufSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
EncodeProtobufSnafu, Error as WriteBatchError, FromProtobufSnafu, MissingColumnSnafu, Mutation,
ParseSchemaSnafu, PutData, Result, ToProtobufSnafu, WriteBatch,
use crate::error::{
BatchCorruptedSnafu, CreateRecordBatchSnafu, DecodeArrowSnafu, EncodeArrowSnafu, Error,
ParseSchemaSnafu, Result,
};
use crate::proto::wal::MutationType;
use crate::write_batch::{Mutation, Payload};
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
// CPU consumption, etc
#[derive(Default)]
pub struct WriteBatchArrowEncoder {}
pub struct PayloadEncoder {}
impl WriteBatchArrowEncoder {
impl PayloadEncoder {
pub fn new() -> Self {
Self::default()
}
}
impl Encoder for WriteBatchArrowEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
impl Encoder for PayloadEncoder {
type Item = Payload;
type Error = Error;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let item_schema = item.schema();
let arrow_schema = item_schema.arrow_schema();
fn encode(&self, item: &Payload, dst: &mut Vec<u8>) -> Result<()> {
let arrow_schema = item.schema.arrow_schema();
let opts = IpcWriteOptions::default();
let mut writer = StreamWriter::try_new_with_options(dst, arrow_schema, opts)
.context(EncodeArrowSnafu)?;
for mutation in item.iter() {
let rb = match mutation {
Mutation::Put(put) => {
let arrays = item_schema
.column_schemas()
.iter()
.map(|column_schema| {
let vector = put.column_by_name(&column_schema.name).context(
MissingColumnSnafu {
name: &column_schema.name,
},
)?;
Ok(vector.to_arrow_array())
})
.collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(arrow_schema.clone(), arrays).context(EncodeArrowSnafu)?
}
};
writer.write(&rb).context(EncodeArrowSnafu)?;
for mutation in &item.mutations {
let record_batch = mutation.record_batch.df_record_batch();
writer.write(record_batch).context(EncodeArrowSnafu)?;
}
writer.finish().context(EncodeArrowSnafu)?;
Ok(())
}
}
pub struct WriteBatchArrowDecoder {
mutation_types: Vec<i32>,
pub struct PayloadDecoder<'a> {
mutation_types: &'a [i32],
}
impl WriteBatchArrowDecoder {
pub fn new(mutation_types: Vec<i32>) -> Self {
impl<'a> PayloadDecoder<'a> {
pub fn new(mutation_types: &'a [i32]) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchArrowDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
impl<'a> Decoder for PayloadDecoder<'a> {
type Item = Payload;
type Error = Error;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
fn decode(&self, src: &[u8]) -> Result<Payload> {
let reader = Cursor::new(src);
let mut reader = StreamReader::try_new(reader, None).context(DecodeArrowSnafu)?;
let arrow_schema = reader.schema();
let mut chunks = Vec::with_capacity(self.mutation_types.len());
for maybe_record_batch in reader.by_ref() {
let record_batch = maybe_record_batch.context(DecodeArrowSnafu)?;
chunks.push(record_batch);
// We could let the decoder takes a schema as input if possible, then we don't
// need to rebuild the schema here.
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
let mut mutations = Vec::with_capacity(self.mutation_types.len());
for (record_batch, mutation_type) in reader.by_ref().zip(self.mutation_types) {
let record_batch = record_batch.context(DecodeArrowSnafu)?;
let record_batch = RecordBatch::try_from_df_record_batch(schema.clone(), record_batch)
.context(CreateRecordBatchSnafu)?;
let op_type = match MutationType::from_i32(*mutation_type) {
Some(MutationType::Delete) => {
unimplemented!("delete mutation is not implemented")
}
Some(MutationType::Put) => OpType::Put,
None => {
return BatchCorruptedSnafu {
message: format!("Unexpceted mutation type: {mutation_type}"),
}
.fail()
}
};
mutations.push(Mutation {
op_type,
record_batch,
});
}
// check if exactly finished
ensure!(
reader.is_finished(),
DataCorruptedSnafu {
message: "Impossible, the num of data chunks is different than expected."
BatchCorruptedSnafu {
message: "The num of data chunks is different than expected."
}
);
ensure!(
chunks.len() == self.mutation_types.len(),
DataCorruptedSnafu {
mutations.len() == self.mutation_types.len(),
BatchCorruptedSnafu {
message: format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
chunks.len()
mutations.len()
)
}
);
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
let mut write_batch = WriteBatch::new(schema.clone());
for (mutation_type, record_batch) in self.mutation_types.iter().zip(chunks.into_iter()) {
match MutationType::from_i32(*mutation_type) {
Some(MutationType::Put) => {
let mut put_data = PutData::with_num_columns(schema.num_columns());
for (column_schema, array) in schema
.column_schemas()
.iter()
.zip(record_batch.columns().iter())
{
let vector = Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
put_data.add_column_by_name(&column_schema.name, vector)?;
}
write_batch.put(put_data)?;
}
Some(MutationType::Delete) => {
unimplemented!("delete mutation is not implemented")
}
_ => {
return DataCorruptedSnafu {
message: format!("Unexpected mutation type: {mutation_type}"),
}
.fail()
}
}
}
Ok(write_batch)
}
}
pub struct WriteBatchProtobufEncoder {}
impl Encoder for WriteBatchProtobufEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let schema = item.schema().into();
let mutations = item
.iter()
.map(|mtn| match mtn {
Mutation::Put(put_data) => item
.schema()
.column_schemas()
.iter()
.map(|cs| {
let vector = put_data
.column_by_name(&cs.name)
.context(MissingColumnSnafu { name: &cs.name })?;
gen_columns(vector).context(ToProtobufSnafu)
})
.collect::<Result<Vec<_>>>(),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|columns| write_batch::Mutation {
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
columns,
})),
})
.collect();
let write_batch = write_batch::WriteBatch {
schema: Some(schema),
mutations,
};
write_batch.encode(dst).context(EncodeProtobufSnafu)
}
}
pub struct WriteBatchProtobufDecoder {
mutation_types: Vec<i32>,
}
impl WriteBatchProtobufDecoder {
#[allow(dead_code)]
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
impl Decoder for WriteBatchProtobufDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
let schema = write_batch.schema.context(DataCorruptedSnafu {
message: "schema required",
})?;
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
ensure!(
write_batch.mutations.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: &format!(
"expected {} mutations, but got {}",
self.mutation_types.len(),
write_batch.mutations.len()
)
}
);
let mutations = write_batch
.mutations
.into_iter()
.map(|mtn| match mtn.mutation {
Some(write_batch::mutation::Mutation::Put(put)) => {
let mut put_data = PutData::with_num_columns(put.columns.len());
let res = schema
.column_schemas()
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.zip(put.columns.into_iter())
.map(|((name, data_type), column)| {
gen_put_data_vector(data_type, column)
.map(|vector| (name, vector))
.context(FromProtobufSnafu)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
}
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
_ => DataCorruptedSnafu {
message: "invalid mutation type",
}
.fail(),
})
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(schema);
mutations
.into_iter()
.try_for_each(|mutation| match mutation {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
Ok(write_batch)
Ok(Payload { schema, mutations })
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::vectors::{BooleanVector, TimestampMillisecondVector, UInt64Vector};
use store_api::storage::PutOperation;
use datatypes::vectors::{BooleanVector, TimestampMillisecondVector, UInt64Vector, VectorRef};
use store_api::storage::{consts, WriteRequest};
use super::*;
use crate::write_batch::WriteBatch;
use crate::{proto, write_batch};
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
let mut batch = write_batch::new_test_batch();
for i in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i]));
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
let boolv =
Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
let types = proto::wal::gen_mutation_types(batch.payload());
(batch, types)
}
@@ -315,32 +167,15 @@ mod tests {
fn test_codec_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = WriteBatchArrowEncoder::new();
let encoder = PayloadEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
let result = encoder.encode(batch.payload(), &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let decoder = PayloadDecoder::new(&mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
let payload = result?;
assert_eq!(*batch.payload(), payload);
Ok(())
}
@@ -348,18 +183,18 @@ mod tests {
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
let mut batch = write_batch::new_test_batch();
for _ in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0])) as VectorRef;
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut put_data = HashMap::with_capacity(3);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("ts".to_string(), tsv);
batch.put(put_data).unwrap();
}
let types = proto::wal::gen_mutation_types(&batch);
let types = proto::wal::gen_mutation_types(batch.payload());
(batch, types)
}
@@ -368,31 +203,15 @@ mod tests {
fn test_codec_with_none_column_arrow() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = WriteBatchArrowEncoder::new();
let encoder = PayloadEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
let result = encoder.encode(batch.payload(), &mut dst);
assert!(result.is_ok());
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let decoder = PayloadDecoder::new(&mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_with_none_column_protobuf() -> Result<()> {
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = WriteBatchProtobufEncoder {};
let mut dst = vec![];
encoder.encode(&batch, &mut dst).unwrap();
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
let payload = result?;
assert_eq!(*batch.payload(), payload);
Ok(())
}

View File

@@ -12,20 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_recordbatch::RecordBatch;
use datatypes::schema::{ColumnSchema, SchemaRef};
use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
use crate::schema::compat::CompatWrite;
use crate::write_batch::{Mutation, PutData, WriteBatch};
use crate::write_batch::{self, Mutation, WriteBatch};
impl CompatWrite for WriteBatch {
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> {
let (data_version, schema_version) = (dest_schema.version(), self.schema.version());
let data_version = dest_schema.version();
let schema_version = self.schema().version();
// Fast path, nothing to do if schema version of the write batch is equal to version
// of destination.
if data_version == schema_version {
debug_assert_eq!(dest_schema.column_schemas(), self.schema.column_schemas());
debug_assert_eq!(dest_schema.column_schemas(), self.schema().column_schemas());
return Ok(());
}
@@ -39,7 +41,7 @@ impl CompatWrite for WriteBatch {
);
// For columns not in schema, returns error instead of discarding the column silently.
let column_not_in = column_not_in_schema(dest_schema, self.schema.column_schemas());
let column_not_in = column_not_in_schema(dest_schema, self.schema().column_schemas());
ensure!(
column_not_in.is_none(),
error::NotInSchemaToCompatSnafu {
@@ -48,37 +50,39 @@ impl CompatWrite for WriteBatch {
}
);
for m in &mut self.mutations {
match m {
Mutation::Put(put_data) => {
put_data.compat_write(dest_schema)?;
}
}
for mutation in &mut self.payload.mutations {
mutation.compat_write(dest_schema)?;
}
// Change schema to `dest_schema`.
self.schema = dest_schema.clone();
self.payload.schema = dest_schema.clone();
Ok(())
}
}
impl CompatWrite for PutData {
impl CompatWrite for Mutation {
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> {
if self.is_empty() {
if self.record_batch.num_rows() == 0 {
return Ok(());
}
let num_rows = self.record_batch.num_rows();
let mut columns = Vec::with_capacity(dest_schema.num_columns());
for column_schema in dest_schema.column_schemas() {
if self.column_by_name(&column_schema.name).is_none() {
if let Some(vector) = self.record_batch.column_by_name(&column_schema.name) {
columns.push(vector.clone());
} else {
// We need to fill the column by null or its default value.
self.add_default_by_name(column_schema)
.context(error::AddDefaultSnafu {
column: &column_schema.name,
})?;
let vector = write_batch::new_column_with_default_value(column_schema, num_rows)?;
columns.push(vector);
}
}
// Using dest schema to build RecordBatch.
self.record_batch = RecordBatch::new(dest_schema.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(())
}
}
@@ -95,12 +99,13 @@ fn column_not_in_schema(schema: &SchemaRef, column_schemas: &[ColumnSchema]) ->
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, SchemaBuilder};
use datatypes::vectors::{Int32Vector, TimestampMillisecondVector};
use store_api::storage::{PutOperation, WriteRequest};
use datatypes::vectors::{Int32Vector, TimestampMillisecondVector, VectorRef};
use store_api::storage::WriteRequest;
use super::*;
use crate::error::Error;
@@ -135,23 +140,31 @@ mod tests {
Arc::new(schema)
}
fn new_put_data() -> PutData {
let mut put_data = PutData::new();
let k0 = Arc::new(Int32Vector::from_slice(&[1, 2, 3]));
let ts = Arc::new(TimestampMillisecondVector::from_values([11, 12, 13]));
fn new_put_data() -> HashMap<String, VectorRef> {
let mut put_data = HashMap::new();
let k0 = Arc::new(Int32Vector::from_slice(&[1, 2, 3])) as VectorRef;
let ts = Arc::new(TimestampMillisecondVector::from_values([11, 12, 13])) as VectorRef;
put_data.add_key_column("k0", k0).unwrap();
put_data.add_key_column("ts", ts).unwrap();
put_data.insert("k0".to_string(), k0);
put_data.insert("ts".to_string(), ts);
put_data
}
#[test]
fn test_put_data_compat_write() {
let mut put_data = new_put_data();
fn test_mutation_compat_write() {
let put_data = new_put_data();
let schema_old = new_test_schema(None);
// Mutation doesn't check schema version, so we don't have to bump the version here.
let schema = new_test_schema(Some(Some(ColumnDefaultConstraint::null_value())));
put_data.compat_write(&schema).unwrap();
let v0 = put_data.column_by_name("v0").unwrap();
// Use WriteBatch to build a payload and its mutation.
let mut batch = WriteBatch::new(schema_old);
batch.put(put_data).unwrap();
let mutation = &mut batch.payload.mutations[0];
mutation.compat_write(&schema).unwrap();
let v0 = mutation.record_batch.column_by_name("v0").unwrap();
assert!(v0.only_null());
}
@@ -170,8 +183,9 @@ mod tests {
);
batch.compat_write(&schema_new).unwrap();
assert_eq!(schema_new, *batch.schema());
let Mutation::Put(put_data) = batch.iter().next().unwrap();
put_data.column_by_name("v0").unwrap();
let mutation = &batch.payload().mutations[0];
mutation.record_batch.column_by_name("v0").unwrap();
}
#[test]

View File

@@ -36,7 +36,7 @@ pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}
pub use self::metadata::RegionMeta;
pub use self::region::{Region, WriteContext};
pub use self::requests::{
AddColumn, AlterOperation, AlterRequest, GetRequest, PutOperation, ScanRequest, WriteRequest,
AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest,
};
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
pub use self::snapshot::{ReadContext, Snapshot};

View File

@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::time::Duration;
use std::collections::{HashMap, HashSet};
use common_error::ext::ErrorExt;
use common_query::logical_plan::Expr;
use common_time::RangeMillis;
use datatypes::vectors::VectorRef;
use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber};
@@ -28,35 +26,11 @@ use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber};
/// the request follows the same schema restriction.
pub trait WriteRequest: Send {
type Error: ErrorExt + Send + Sync;
type PutOp: PutOperation;
/// Add put operation to the request.
fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>;
/// Returns all possible time ranges that contain the timestamp in this batch.
///
/// Each time range is aligned to given `duration`.
fn time_ranges(&self, duration: Duration) -> Result<Vec<RangeMillis>, Self::Error>;
/// Create a new put operation.
fn put_op(&self) -> Self::PutOp;
/// Create a new put operation with capacity reserved for `num_columns`.
fn put_op_with_columns(num_columns: usize) -> Self::PutOp;
}
/// Put multiple rows.
pub trait PutOperation: Send + std::fmt::Debug {
type Error: ErrorExt + Send + Sync;
/// Put data to the key column.
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
/// Put data to the version column.
fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>;
/// Put data to the value column.
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
/// `data` is the columnar format of the data to put.
fn put(&mut self, data: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
}
#[derive(Default)]

View File

@@ -39,9 +39,6 @@ pub enum InnerError {
backtrace: Backtrace,
},
#[snafu(display("Missing column when insert, column: {}", name))]
MissingColumn { name: String, backtrace: Backtrace },
#[snafu(display("Poll stream failed, source: {}", source))]
PollStream {
source: ArrowError,
@@ -119,9 +116,9 @@ impl ErrorExt for InnerError {
| InnerError::PollStream { .. }
| InnerError::SchemaConversion { .. }
| InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery,
InnerError::MissingColumn { .. }
| InnerError::RemoveColumnInIndex { .. }
| InnerError::BuildColumnDescriptor { .. } => StatusCode::InvalidArguments,
InnerError::RemoveColumnInIndex { .. } | InnerError::BuildColumnDescriptor { .. } => {
StatusCode::InvalidArguments
}
InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected,
InnerError::ColumnExists { .. } => StatusCode::TableColumnExists,
InnerError::SchemaBuild { source, .. } => source.status_code(),
@@ -166,12 +163,16 @@ mod tests {
Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)?
}
fn throw_missing_column_inner() -> std::result::Result<(), InnerError> {
MissingColumnSnafu { name: "test" }.fail()
fn throw_column_exists_inner() -> std::result::Result<(), InnerError> {
ColumnExistsSnafu {
column_name: "col",
table_name: "test",
}
.fail()
}
fn throw_missing_column() -> Result<()> {
Ok(throw_missing_column_inner()?)
Ok(throw_column_exists_inner()?)
}
fn throw_arrow() -> Result<()> {
@@ -186,7 +187,7 @@ mod tests {
let err = throw_missing_column().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert_eq!(StatusCode::TableColumnExists, err.status_code());
let err = throw_arrow().err().unwrap();
assert!(err.backtrace_opt().is_some());
@@ -195,15 +196,15 @@ mod tests {
#[test]
fn test_into_record_batch_error() {
let err = throw_missing_column_inner().err().unwrap();
let err = throw_column_exists_inner().err().unwrap();
let err: RecordBatchError = err.into();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert_eq!(StatusCode::TableColumnExists, err.status_code());
}
#[test]
fn test_into_df_error() {
let err = throw_missing_column_inner().err().unwrap();
let err = throw_column_exists_inner().err().unwrap();
let err: DataFusionError = err.into();
assert!(matches!(err, DataFusionError::External(_)));
}