refactor: Extract mito codec part into a new crate (#6307)

* chore: add a new crate mito-codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: port necessary mods for primary key codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use codec utils in mito-codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused mods

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove Partition::is_partition_column()

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove duplicated test utils

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove unused comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix is_partition_column check

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-06-13 15:14:29 +08:00
committed by GitHub
parent 505bf25505
commit eaf1e1198f
53 changed files with 587 additions and 211 deletions

25
Cargo.lock generated
View File

@@ -7248,6 +7248,7 @@ dependencies = [
"humantime-serde",
"itertools 0.14.0",
"lazy_static",
"mito-codec",
"mito2",
"mur3",
"object-store",
@@ -7313,6 +7314,29 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mito-codec"
version = "0.15.0"
dependencies = [
"api",
"bytes",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-recordbatch",
"common-telemetry",
"common-time",
"datafusion-common",
"datafusion-expr",
"datatypes",
"memcomparable",
"paste",
"serde",
"snafu 0.8.5",
"store-api",
]
[[package]]
name = "mito2"
version = "0.15.0"
@@ -7355,6 +7379,7 @@ dependencies = [
"lazy_static",
"log-store",
"memcomparable",
"mito-codec",
"moka",
"object-store",
"parquet",

View File

@@ -49,6 +49,7 @@ members = [
"src/meta-client",
"src/meta-srv",
"src/metric-engine",
"src/mito-codec",
"src/mito2",
"src/object-store",
"src/operator",
@@ -274,6 +275,7 @@ log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
metric-engine = { path = "src/metric-engine" }
mito-codec = { path = "src/mito-codec" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }

View File

@@ -27,6 +27,7 @@ futures-util.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito-codec.workspace = true
mito2.workspace = true
mur3 = "0.1"
object-store.workspace = true

View File

@@ -115,7 +115,7 @@ pub enum Error {
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito2::error::Error,
source: mito_codec::error::Error,
#[snafu(implicit)]
location: Location,
},

View File

@@ -18,7 +18,7 @@ use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use datatypes::value::ValueRef;
use mito2::row_converter::SparsePrimaryKeyCodec;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;

30
src/mito-codec/Cargo.toml Normal file
View File

@@ -0,0 +1,30 @@
[package]
name = "mito-codec"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
api.workspace = true
bytes.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
memcomparable = "0.2"
paste.workspace = true
serde.workspace = true
snafu.workspace = true
store-api.workspace = true
[dev-dependencies]
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
/// Error definitions for mito encoding.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch {
// Box the source to reduce the size of the error.
#[snafu(source(from(datatypes::error::Error, Box::new)))]
source: Box<datatypes::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Operation not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Encode null value"))]
IndexEncodeNull {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
FieldTypeMismatch { source, .. } => source.status_code(),
SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => {
StatusCode::InvalidArguments
}
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Index codec utilities.
use std::collections::HashMap;
use std::sync::Arc;
@@ -47,7 +49,7 @@ impl IndexValueCodec {
) -> Result<()> {
ensure!(!value.is_null(), IndexEncodeNullSnafu);
if matches!(field.data_type, ConcreteDataType::String(_)) {
if matches!(field.data_type(), ConcreteDataType::String(_)) {
let value = value
.as_string()
.context(FieldTypeMismatchSnafu)?

View File

@@ -31,7 +31,7 @@ pub struct KeyValues {
///
/// This mutation must be a valid mutation and rows in the mutation
/// must not be `None`.
pub(crate) mutation: Mutation,
pub mutation: Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
/// Primary key encoding hint.
@@ -333,8 +333,7 @@ mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::test_util::i64_value;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;

24
src/mito-codec/src/lib.rs Normal file
View File

@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Codec utilities for the Mito protocol.
pub mod error;
pub mod index;
pub mod key_values;
pub mod primary_key_filter;
pub mod row_converter;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -19,12 +19,17 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::Value;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::partition_tree::partition::Partition;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
/// Returns true if this is a partition column for metrics in the memtable.
pub fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
#[derive(Clone)]
struct PrimaryKeyFilterInner {
metadata: RegionMetadataRef,
@@ -42,7 +47,7 @@ impl PrimaryKeyFilterInner {
let mut result = true;
for filter in self.filters.iter() {
if Partition::is_partition_column(filter.column_name()) {
if is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
@@ -149,9 +154,8 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datafusion::logical_expr::BinaryExpr;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{Expr, Operator};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod dense;
pub mod sparse;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {
/// Encodes rows to bytes.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
/// Encodes rows to specific vec.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
}
pub trait PrimaryKeyFilter: Send + Sync {
/// Returns true if the primary key matches the filter.
fn matches(&mut self, pk: &[u8]) -> bool;
}
/// Composite values decoded from primary key bytes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),
Sparse(SparseValues),
}
impl CompositeValues {
/// Extends the composite values with the given values.
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
match self {
CompositeValues::Dense(dense_values) => {
for (column_id, value) in values {
dense_values.push((*column_id, value.clone()));
}
}
CompositeValues::Sparse(sprase_value) => {
for (column_id, value) in values {
sprase_value.insert(*column_id, value.clone());
}
}
}
}
}
#[cfg(any(test, feature = "testing"))]
impl CompositeValues {
pub fn into_sparse(self) -> SparseValues {
match self {
CompositeValues::Sparse(v) => v,
_ => panic!("CompositeValues is not sparse"),
}
}
pub fn into_dense(self) -> Vec<Value> {
match self {
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
_ => panic!("CompositeValues is not dense"),
}
}
}
pub trait PrimaryKeyCodec: Send + Sync + Debug {
/// Encodes a key value to bytes.
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()>;
/// Returns the number of fields in the primary key.
fn num_fields(&self) -> Option<usize>;
/// Returns a primary key filter factory.
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter>;
/// Returns the estimated size of the primary key.
fn estimated_size(&self) -> Option<usize> {
None
}
/// Returns the encoding type of the primary key.
fn encoding(&self) -> PrimaryKeyEncoding;
/// Decodes the primary key from the given bytes.
///
/// Returns a [`CompositeValues`] that follows the primary key ordering.
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
/// Decode the leftmost value from bytes.
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
let fields = region_metadata.primary_key_columns().map(|col| {
(
col.column_id,
SortField::new(col.column_schema.data_type.clone()),
)
});
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec_with_fields(
encoding: PrimaryKeyEncoding,
fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
match encoding {
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
PrimaryKeyEncoding::Sparse => {
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
}
}
}

View File

@@ -35,15 +35,16 @@ use store_api::storage::ColumnId;
use crate::error::{
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
use crate::key_values::KeyValue;
use crate::primary_key_filter::DensePrimaryKeyFilter;
use crate::row_converter::{
CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
};
/// Field to serialize and deserialize value in memcomparable format.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
pub(crate) data_type: ConcreteDataType,
data_type: ConcreteDataType,
}
impl SortField {
@@ -51,6 +52,11 @@ impl SortField {
Self { data_type }
}
/// Returns the data type of the field.
pub fn data_type(&self) -> &ConcreteDataType {
&self.data_type
}
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
@@ -75,10 +81,9 @@ impl SortField {
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
impl SortField {
pub(crate) fn serialize(
/// Serialize a value to the serializer.
pub fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
@@ -163,7 +168,8 @@ impl SortField {
Ok(())
}
pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
/// Deserialize a value from the deserializer.
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
macro_rules! deserialize_and_build_value {
(
$self: ident;
@@ -525,7 +531,7 @@ mod tests {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
assert_eq!(decoded, row);
}
}

View File

@@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId;
use store_api::storage::ColumnId;
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
use crate::key_values::KeyValue;
use crate::primary_key_filter::SparsePrimaryKeyFilter;
use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
@@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec {
}
/// Returns the offset of the given column id in the given primary key.
pub(crate) fn has_column(
pub fn has_column(
&self,
pk: &[u8],
offsets_map: &mut HashMap<u32, usize>,
@@ -233,12 +233,7 @@ impl SparsePrimaryKeyCodec {
}
/// Decode value at `offset` in `pk`.
pub(crate) fn decode_value_at(
&self,
pk: &[u8],
offset: usize,
column_id: ColumnId,
) -> Result<Value> {
pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
let mut deserializer = Deserializer::new(pk);
deserializer.advance(offset);
// Safety: checked by `has_column`
@@ -300,6 +295,40 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
}
}
/// Field with column id.
pub struct FieldWithId {
pub field: SortField,
pub column_id: ColumnId,
}
/// A special encoder for memtable.
pub struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
pub fn new(fields: Vec<FieldWithId>) -> Self {
Self { fields }
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata).
//! Test utilities for mito codec.
use api::greptime_proto::v1;
use api::v1::value::ValueData;
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -105,3 +107,10 @@ impl TestRegionMetadataBuilder {
builder.build().unwrap()
}
}
/// Creates value for i64.
pub fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
}
}

View File

@@ -48,6 +48,7 @@ itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
memcomparable = "0.2"
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
@@ -82,6 +83,7 @@ common-test-util.workspace = true
criterion = "0.4"
dotenv.workspace = true
log-store.workspace = true
mito-codec = { workspace = true, features = ["testing"] }
object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true

View File

@@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::row_converter::DensePrimaryKeyCodec;
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use mito_codec::row_converter::DensePrimaryKeyCodec;
use rand::rngs::ThreadRng;
use rand::seq::IndexedRandom;
use rand::Rng;

View File

@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Unexpected data type"))]
DataTypeMismatch {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
@@ -291,35 +298,6 @@ pub enum Error {
#[snafu(display("Failed to write region"))]
WriteGroup { source: Arc<Error> },
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch { source: datatypes::error::Error },
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
InvalidParquet {
file: String,
@@ -1028,6 +1006,20 @@ pub enum Error {
location: Location,
source: common_grpc::Error,
},
#[snafu(display("Failed to encode"))]
Encode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display("Failed to decode"))]
Decode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1052,6 +1044,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
DataTypeMismatch { source, .. } => source.status_code(),
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
source.status_code()
@@ -1095,7 +1088,6 @@ impl ErrorExt for Error {
| BiErrors { .. }
| StopScheduler { .. }
| ComputeVector { .. }
| SerializeField { .. }
| EncodeMemtable { .. }
| CreateDir { .. }
| ReadDataPart { .. }
@@ -1107,9 +1099,7 @@ impl ErrorExt for Error {
WriteParquet { .. } => StatusCode::StorageUnavailable,
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
@@ -1181,7 +1171,9 @@ impl ErrorExt for Error {
ScanSeries { source, .. } => source.status_code(),
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
}
}

View File

@@ -41,7 +41,6 @@ pub mod read;
pub mod region;
mod region_write_ctx;
pub mod request;
pub mod row_converter;
pub mod schedule;
pub mod sst;
mod time_provider;

View File

@@ -21,6 +21,8 @@ use std::sync::Arc;
pub use bulk::part::EncodedBulkPart;
use common_time::Timestamp;
use mito_codec::key_values::KeyValue;
pub use mito_codec::key_values::KeyValues;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -29,8 +31,6 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -42,7 +42,6 @@ use crate::sst::file::FileTimeRange;
mod builder;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
mod simple_bulk_memtable;
mod stats;

View File

@@ -16,13 +16,13 @@
use std::sync::{Arc, RwLock};
use mito_codec::key_values::KeyValue;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart};
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,

View File

@@ -17,12 +17,12 @@
use std::collections::VecDeque;
use std::sync::Arc;
use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;

View File

@@ -38,6 +38,8 @@ use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use mito_codec::key_values::{KeyValue, KeyValuesRef};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
@@ -47,13 +49,12 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::error::{
self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::{KeyValue, KeyValuesRef};
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -354,7 +355,9 @@ fn mutations_to_record_batch(
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
pk_encoder
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());

View File

@@ -19,7 +19,6 @@ mod dedup;
mod dict;
mod merger;
mod partition;
mod primary_key_filter;
mod shard;
mod shard_builder;
mod tree;
@@ -29,7 +28,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -38,7 +38,6 @@ use table::predicate::Predicate;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -47,7 +46,6 @@ use crate::memtable::{
PredicateGroup,
};
use crate::region::options::MergeMode;
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
@@ -368,11 +366,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use mito_codec::row_converter::DensePrimaryKeyCodec;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};

View File

@@ -33,6 +33,7 @@ use datatypes::vectors::{
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8Vector, UInt8VectorBuilder,
};
use mito_codec::key_values::KeyValue;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -44,7 +45,6 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::partition_tree::PkIndex;
use crate::metrics::{

View File

@@ -22,13 +22,16 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::error::{EncodeSnafu, Result};
use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::partition_tree::dedup::DedupReader;
use crate::memtable::partition_tree::shard::{
@@ -39,7 +42,6 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -91,7 +93,9 @@ impl Partition {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_key_value(&key_value, primary_key)?;
row_codec
.encode_key_value(&key_value, primary_key)
.context(EncodeSnafu)?;
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
@@ -304,11 +308,6 @@ impl Partition {
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap_or(false)
}
/// Returns true if this is a partition column.
pub(crate) fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
}
pub(crate) struct PartitionStats {
@@ -446,7 +445,7 @@ impl ReadPartitionContext {
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
for filter in filters {
// We already pruned partitions before so we skip the partition column.
if Partition::is_partition_column(filter.column_name()) {
if is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {

View File

@@ -17,10 +17,11 @@
use std::cmp::Ordering;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
};
@@ -29,7 +30,6 @@ use crate::memtable::partition_tree::merger::{Merger, Node};
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Shard stores data related to the same key dictionary.
pub struct Shard {

View File

@@ -18,10 +18,11 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
};
@@ -30,7 +31,6 @@ use crate::memtable::partition_tree::shard::Shard;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.

View File

@@ -23,8 +23,10 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
@@ -32,10 +34,9 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
@@ -46,7 +47,6 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec, SortField};
/// The partition tree.
pub struct PartitionTree {
@@ -73,15 +73,15 @@ impl PartitionTree {
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let sparse_encoder = SparseEncoder {
fields: metadata
let sparse_encoder = SparseEncoder::new(
metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
};
);
let is_partitioned = Partition::has_multi_partitions(&metadata);
let mut config = config.clone();
if config.merge_mode == MergeMode::LastNonNull {
@@ -129,7 +129,8 @@ impl PartitionTree {
} else {
// For compatibility, use the sparse encoder for dense primary key.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), buffer)?;
.encode_to_vec(kv.primary_keys(), buffer)
.context(EncodeSnafu)?;
}
Ok(())
}
@@ -166,7 +167,9 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
}
// Write rows with
@@ -208,7 +211,9 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
}
// Write rows with
@@ -415,7 +420,7 @@ impl PartitionTree {
for (key, partition) in partitions.iter() {
let mut is_needed = true;
for filter in filters {
if !Partition::is_partition_column(filter.column_name()) {
if !is_partition_column(filter.column_name()) {
continue;
}
@@ -436,34 +441,6 @@ impl PartitionTree {
}
}
struct FieldWithId {
field: SortField,
column_id: ColumnId,
}
struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,

View File

@@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock};
use api::v1::OpType;
use datatypes::vectors::Helper;
use mito_codec::key_values::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -26,7 +27,6 @@ use table::predicate::Predicate;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::{Series, Values};
use crate::memtable::{

View File

@@ -29,6 +29,7 @@ use datatypes::arrow::array::{
};
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use mito_codec::key_values::KeyValue;
use smallvec::{smallvec, SmallVec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -36,7 +37,6 @@ use store_api::metadata::RegionMetadataRef;
use crate::error;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};

View File

@@ -35,17 +35,19 @@ use datatypes::vectors::{
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{
self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::builder::{FieldBuilder, StringBuilder};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -57,7 +59,6 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 4;
@@ -176,7 +177,10 @@ impl TimeSeriesMemtable {
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let primary_key_encoded = self
.row_codec
.encode(kv.primary_keys())
.context(EncodeSnafu)?;
let (key_allocated, value_allocated) =
self.series_set.push_to_series(primary_key_encoded, &kv);
@@ -1107,11 +1111,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::{OrderedFloat, Value};
use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
use mito_codec::row_converter::SortField;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::SortField;
use crate::test_util::column_metadata_to_column_schema;
fn schema_for_test() -> RegionMetadataRef {

View File

@@ -50,16 +50,17 @@ use datatypes::vectors::{
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::prune::PruneReader;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -612,7 +613,7 @@ impl Batch {
column_id: ColumnId,
) -> Result<Option<&Value>> {
if self.pk_values.is_none() {
self.pk_values = Some(codec.decode(&self.primary_key)?);
self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
}
let pk_values = self.pk_values.as_ref().unwrap();
@@ -1026,12 +1027,12 @@ pub(crate) struct ScannerMetrics {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::error::Error;
use crate::row_converter::{self, build_primary_key_codec_with_fields};
use crate::test_util::new_batch_builder;
fn new_batch(

View File

@@ -20,17 +20,17 @@ use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -155,7 +155,9 @@ impl CompatPrimaryKey {
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
);
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_values(&self.values, &mut buffer)?;
self.converter
.encode_values(&self.values, &mut buffer)
.context(EncodeSnafu)?;
batch.set_primary_key(buffer);
@@ -405,7 +407,10 @@ impl RewritePrimaryKey {
let values = if let Some(pk_values) = batch.pk_values() {
pk_values
} else {
let new_pk_values = self.original.decode(batch.primary_key())?;
let new_pk_values = self
.original
.decode(batch.primary_key())
.context(DecodeSnafu)?;
batch.set_pk_values(new_pk_values);
// Safety: We ensure pk_values is not None.
batch.pk_values().as_ref().unwrap()
@@ -416,7 +421,9 @@ impl RewritePrimaryKey {
);
match values {
CompositeValues::Dense(values) => {
self.new.encode_values(values.as_slice(), &mut buffer)?;
self.new
.encode_values(values.as_slice(), &mut buffer)
.context(EncodeSnafu)?;
}
CompositeValues::Sparse(values) => {
let values = self
@@ -427,7 +434,9 @@ impl RewritePrimaryKey {
(*id, value.as_value_ref())
})
.collect::<Vec<_>>();
self.new.encode_value_refs(&values, &mut buffer)?;
self.new
.encode_value_refs(&values, &mut buffer)
.context(EncodeSnafu)?;
}
}
batch.set_primary_key(buffer);
@@ -445,12 +454,14 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].

View File

@@ -26,6 +26,7 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
@@ -33,7 +34,6 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
@@ -320,12 +320,12 @@ mod tests {
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use mito_codec::test_util::TestRegionMetadataBuilder;
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
ts_start: i64,

View File

@@ -922,11 +922,12 @@ mod tests {
use api::v1::{Row, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use mito_codec::test_util::i64_value;
use store_api::metadata::RegionMetadataBuilder;
use super::*;
use crate::error::Error;
use crate::test_util::{i64_value, ts_ms_value};
use crate::test_util::ts_ms_value;
fn new_column_schema(
name: &str,

View File

@@ -20,13 +20,13 @@ use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
use mito_codec::key_values::KeyValue;
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub mod intermediate;

View File

@@ -22,6 +22,8 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::bloom_filter::applier::InListPredicate;
use index::Bytes;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
@@ -30,10 +32,8 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub struct BloomFilterIndexApplierBuilder<'a> {
@@ -322,7 +322,8 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
.context(EncodeSnafu)?;
Ok(bytes)
}

View File

@@ -19,6 +19,8 @@ use std::sync::Arc;
use common_telemetry::{debug, warn};
use datatypes::schema::SkippingIndexType;
use index::bloom_filter::creator::BloomFilterCreator;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -26,14 +28,12 @@ use store_api::storage::ColumnId;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
IntermediateLocation, IntermediateManager, TempFileProvider,
};
@@ -210,7 +210,8 @@ impl BloomFilterIndexer {
v.as_value_ref(),
field,
&mut buf,
)?;
)
.context(EncodeSnafu)?;
Ok(buf)
})
.transpose()?;
@@ -234,11 +235,8 @@ impl BloomFilterIndexer {
let elems = (!value.is_null())
.then(|| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(
value,
&sort_field,
&mut buf,
)?;
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
.context(EncodeSnafu)?;
Ok(buf)
})
.transpose()?;
@@ -353,6 +351,7 @@ pub(crate) mod tests {
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
@@ -362,7 +361,6 @@ pub(crate) mod tests {
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub fn mock_object_store() -> ObjectStore {

View File

@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::error::{
CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
};
use crate::read::Batch;
@@ -259,7 +259,7 @@ impl SingleCreator {
let data = data.get_ref(i);
let text = data
.as_string()
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
.unwrap_or_default();
self.inner.push_text(text).await?;
}

View File

@@ -27,6 +27,8 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
@@ -35,9 +37,9 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::inverted_index::InvertedIndexCacheRef;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::codec::IndexValueCodec;
use crate::error::{
BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
};
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -230,7 +232,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
.context(EncodeSnafu)?;
Ok(bytes)
}
}

View File

@@ -194,7 +194,7 @@ mod tests {
};
let res = builder.collect_between(&between);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -264,7 +264,7 @@ mod tests {
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -226,7 +226,7 @@ mod tests {
);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -167,7 +167,7 @@ mod tests {
};
let res = builder.collect_inlist(&in_list);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -22,6 +22,8 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -30,13 +32,11 @@ use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
PushIndexValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
IntermediateLocation, IntermediateManager, TempFileProvider,
};
@@ -205,7 +205,8 @@ impl InvertedIndexer {
v.as_value_ref(),
field,
&mut self.value_buf,
)?;
)
.context(EncodeSnafu)?;
Ok(self.value_buf.as_slice())
})
.transpose()?;
@@ -238,7 +239,8 @@ impl InvertedIndexer {
value,
&sort_field,
&mut self.value_buf,
)?;
)
.context(EncodeSnafu)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.await
@@ -334,6 +336,7 @@ mod tests {
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use futures::future::BoxFuture;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCache;
@@ -346,7 +349,6 @@ mod tests {
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;

View File

@@ -22,18 +22,19 @@ use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
@@ -270,7 +271,11 @@ impl RangeBase {
let pk_values = if let Some(pk_values) = input.pk_values() {
pk_values
} else {
input.set_pk_values(self.codec.decode(input.primary_key())?);
input.set_pk_values(
self.codec
.decode(input.primary_key())
.context(DecodeSnafu)?,
);
input.pk_values().unwrap()
};
let pk_value = match pk_values {
@@ -284,12 +289,12 @@ impl RangeBase {
v[pk_index]
.1
.try_to_scalar_value(filter_ctx.data_type())
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
}
CompositeValues::Sparse(v) => {
let v = v.get_or_null(filter_ctx.column_id());
v.try_to_scalar_value(filter_ctx.data_type())
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
}
};
if filter

View File

@@ -38,6 +38,7 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
@@ -48,7 +49,6 @@ use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::to_sst_arrow_schema;

View File

@@ -26,6 +26,7 @@ use datafusion_expr::Expr;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
@@ -48,7 +49,6 @@ use crate::metrics::{
};
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::row_converter::build_primary_key_codec;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;

View File

@@ -16,7 +16,6 @@
pub mod batch_util;
pub mod memtable_util;
pub mod meta_util;
pub mod scheduler_util;
pub mod sst_util;
pub mod version_util;
@@ -841,13 +840,6 @@ impl CreateRequestBuilder {
}
}
/// Creates value for i64.
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
}
}
/// Creates value for timestamp millis.
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {

View File

@@ -25,20 +25,20 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::TimestampMillisecondVector;
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats,
};
use crate::read::scan_region::PredicateGroup;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
/// Empty memtable for test.
#[derive(Debug, Default)]

View File

@@ -22,6 +22,7 @@ use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Arra
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
@@ -29,7 +30,6 @@ use store_api::metadata::{
use store_api::storage::RegionId;
use crate::read::{Batch, BatchBuilder, Source};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};