From eaf1e1198f8ce6cfaad2043cb61a32b435458a13 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 13 Jun 2025 15:14:29 +0800 Subject: [PATCH] refactor: Extract mito codec part into a new crate (#6307) * chore: add a new crate mito-codec Signed-off-by: evenyag * feat: port necessary mods for primary key codec Signed-off-by: evenyag * refactor: use codec utils in mito-codec Signed-off-by: evenyag * refactor: remove unused mods Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * refactor: remove Partition::is_partition_column() Signed-off-by: evenyag * refactor: remove duplicated test utils Signed-off-by: evenyag * chore: remove unused comment Signed-off-by: evenyag * fix: fix is_partition_column check Signed-off-by: evenyag --------- Signed-off-by: evenyag --- Cargo.lock | 25 +++ Cargo.toml | 2 + src/metric-engine/Cargo.toml | 1 + src/metric-engine/src/error.rs | 2 +- src/metric-engine/src/row_modifier.rs | 2 +- src/mito-codec/Cargo.toml | 30 ++++ src/mito-codec/src/error.rs | 95 ++++++++++ .../codec.rs => mito-codec/src/index.rs} | 4 +- .../memtable => mito-codec/src}/key_values.rs | 5 +- src/mito-codec/src/lib.rs | 24 +++ .../src}/primary_key_filter.rs | 12 +- src/mito-codec/src/row_converter.rs | 164 ++++++++++++++++++ .../src/row_converter/dense.rs | 22 ++- .../src/row_converter/sparse.rs | 47 ++++- .../src/test_util.rs} | 11 +- src/mito2/Cargo.toml | 2 + src/mito2/benches/memtable_bench.rs | 2 +- src/mito2/src/error.rs | 60 +++---- src/mito2/src/lib.rs | 1 - src/mito2/src/memtable.rs | 5 +- src/mito2/src/memtable/bulk.rs | 2 +- src/mito2/src/memtable/bulk/context.rs | 2 +- src/mito2/src/memtable/bulk/part.rs | 13 +- src/mito2/src/memtable/partition_tree.rs | 8 +- src/mito2/src/memtable/partition_tree/data.rs | 2 +- .../src/memtable/partition_tree/partition.rs | 19 +- .../src/memtable/partition_tree/shard.rs | 4 +- .../memtable/partition_tree/shard_builder.rs | 4 +- src/mito2/src/memtable/partition_tree/tree.rs | 57 ++---- .../src/memtable/simple_bulk_memtable.rs | 2 +- src/mito2/src/memtable/time_partition.rs | 2 +- src/mito2/src/memtable/time_series.rs | 16 +- src/mito2/src/read.rs | 9 +- src/mito2/src/read/compat.rs | 31 ++-- src/mito2/src/read/projection.rs | 6 +- src/mito2/src/request.rs | 3 +- src/mito2/src/row_converter.rs | 2 +- src/mito2/src/sst/index.rs | 1 - .../sst/index/bloom_filter/applier/builder.rs | 9 +- .../src/sst/index/bloom_filter/creator.rs | 20 +-- .../src/sst/index/fulltext_index/creator.rs | 4 +- .../index/inverted_index/applier/builder.rs | 11 +- .../inverted_index/applier/builder/between.rs | 2 +- .../applier/builder/comparison.rs | 2 +- .../inverted_index/applier/builder/eq_list.rs | 2 +- .../inverted_index/applier/builder/in_list.rs | 2 +- .../src/sst/index/inverted_index/creator.rs | 14 +- src/mito2/src/sst/parquet/file_range.rs | 15 +- src/mito2/src/sst/parquet/format.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 2 +- src/mito2/src/test_util.rs | 8 - src/mito2/src/test_util/memtable_util.rs | 4 +- src/mito2/src/test_util/sst_util.rs | 2 +- 53 files changed, 587 insertions(+), 211 deletions(-) create mode 100644 src/mito-codec/Cargo.toml create mode 100644 src/mito-codec/src/error.rs rename src/{mito2/src/sst/index/codec.rs => mito-codec/src/index.rs} (98%) rename src/{mito2/src/memtable => mito-codec/src}/key_values.rs (99%) create mode 100644 src/mito-codec/src/lib.rs rename src/{mito2/src/memtable/partition_tree => mito-codec/src}/primary_key_filter.rs (96%) create mode 100644 src/mito-codec/src/row_converter.rs rename src/{mito2 => mito-codec}/src/row_converter/dense.rs (97%) rename src/{mito2 => mito-codec}/src/row_converter/sparse.rs (94%) rename src/{mito2/src/test_util/meta_util.rs => mito-codec/src/test_util.rs} (93%) diff --git a/Cargo.lock b/Cargo.lock index b1b017128c..792907d9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7248,6 +7248,7 @@ dependencies = [ "humantime-serde", "itertools 0.14.0", "lazy_static", + "mito-codec", "mito2", "mur3", "object-store", @@ -7313,6 +7314,29 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mito-codec" +version = "0.15.0" +dependencies = [ + "api", + "bytes", + "common-base", + "common-decimal", + "common-error", + "common-macro", + "common-recordbatch", + "common-telemetry", + "common-time", + "datafusion-common", + "datafusion-expr", + "datatypes", + "memcomparable", + "paste", + "serde", + "snafu 0.8.5", + "store-api", +] + [[package]] name = "mito2" version = "0.15.0" @@ -7355,6 +7379,7 @@ dependencies = [ "lazy_static", "log-store", "memcomparable", + "mito-codec", "moka", "object-store", "parquet", diff --git a/Cargo.toml b/Cargo.toml index 5b85de7f26..f27d1fa586 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ members = [ "src/meta-client", "src/meta-srv", "src/metric-engine", + "src/mito-codec", "src/mito2", "src/object-store", "src/operator", @@ -274,6 +275,7 @@ log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } metric-engine = { path = "src/metric-engine" } +mito-codec = { path = "src/mito-codec" } mito2 = { path = "src/mito2" } object-store = { path = "src/object-store" } operator = { path = "src/operator" } diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 9e7a5b8545..0306a38ade 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -27,6 +27,7 @@ futures-util.workspace = true humantime-serde.workspace = true itertools.workspace = true lazy_static = "1.4" +mito-codec.workspace = true mito2.workspace = true mur3 = "0.1" object-store.workspace = true diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 1b390a45ba..0ffd3c8bc6 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -115,7 +115,7 @@ pub enum Error { #[snafu(display("Failed to encode primary key"))] EncodePrimaryKey { - source: mito2::error::Error, + source: mito_codec::error::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index 2af296cb95..56618ded8d 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -18,7 +18,7 @@ use std::hash::Hash; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use datatypes::value::ValueRef; -use mito2::row_converter::SparsePrimaryKeyCodec; +use mito_codec::row_converter::SparsePrimaryKeyCodec; use smallvec::SmallVec; use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; diff --git a/src/mito-codec/Cargo.toml b/src/mito-codec/Cargo.toml new file mode 100644 index 0000000000..3fddf7c145 --- /dev/null +++ b/src/mito-codec/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "mito-codec" +version.workspace = true +edition.workspace = true +license.workspace = true + +[features] +default = [] +testing = [] + +[dependencies] +api.workspace = true +bytes.workspace = true +common-base.workspace = true +common-decimal.workspace = true +common-error.workspace = true +common-macro.workspace = true +common-recordbatch.workspace = true +common-telemetry.workspace = true +common-time.workspace = true +datatypes.workspace = true +memcomparable = "0.2" +paste.workspace = true +serde.workspace = true +snafu.workspace = true +store-api.workspace = true + +[dev-dependencies] +datafusion-common.workspace = true +datafusion-expr.workspace = true diff --git a/src/mito-codec/src/error.rs b/src/mito-codec/src/error.rs new file mode 100644 index 0000000000..1be0074b1e --- /dev/null +++ b/src/mito-codec/src/error.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use datatypes::prelude::ConcreteDataType; +use snafu::{Location, Snafu}; + +/// Error definitions for mito encoding. +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Row value mismatches field data type"))] + FieldTypeMismatch { + // Box the source to reduce the size of the error. + #[snafu(source(from(datatypes::error::Error, Box::new)))] + source: Box, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serialize field"))] + SerializeField { + #[snafu(source)] + error: memcomparable::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Data type: {} does not support serialization/deserialization", + data_type, + ))] + NotSupportedField { + data_type: ConcreteDataType, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to deserialize field"))] + DeserializeField { + #[snafu(source)] + error: memcomparable::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Operation not supported: {}", err_msg))] + UnsupportedOperation { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Encode null value"))] + IndexEncodeNull { + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + FieldTypeMismatch { source, .. } => source.status_code(), + SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => { + StatusCode::InvalidArguments + } + NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito-codec/src/index.rs similarity index 98% rename from src/mito2/src/sst/index/codec.rs rename to src/mito-codec/src/index.rs index c4ea9941aa..768d5db20a 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito-codec/src/index.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Index codec utilities. + use std::collections::HashMap; use std::sync::Arc; @@ -47,7 +49,7 @@ impl IndexValueCodec { ) -> Result<()> { ensure!(!value.is_null(), IndexEncodeNullSnafu); - if matches!(field.data_type, ConcreteDataType::String(_)) { + if matches!(field.data_type(), ConcreteDataType::String(_)) { let value = value .as_string() .context(FieldTypeMismatchSnafu)? diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito-codec/src/key_values.rs similarity index 99% rename from src/mito2/src/memtable/key_values.rs rename to src/mito-codec/src/key_values.rs index a6af39a306..896a57810a 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito-codec/src/key_values.rs @@ -31,7 +31,7 @@ pub struct KeyValues { /// /// This mutation must be a valid mutation and rows in the mutation /// must not be `None`. - pub(crate) mutation: Mutation, + pub mutation: Mutation, /// Key value read helper. helper: SparseReadRowHelper, /// Primary key encoding hint. @@ -333,8 +333,7 @@ mod tests { use api::v1::{self, ColumnDataType, SemanticType}; use super::*; - use crate::test_util::i64_value; - use crate::test_util::meta_util::TestRegionMetadataBuilder; + use crate::test_util::{i64_value, TestRegionMetadataBuilder}; const TS_NAME: &str = "ts"; const START_SEQ: SequenceNumber = 100; diff --git a/src/mito-codec/src/lib.rs b/src/mito-codec/src/lib.rs new file mode 100644 index 0000000000..d1f4d16827 --- /dev/null +++ b/src/mito-codec/src/lib.rs @@ -0,0 +1,24 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Codec utilities for the Mito protocol. + +pub mod error; +pub mod index; +pub mod key_values; +pub mod primary_key_filter; +pub mod row_converter; + +#[cfg(any(test, feature = "testing"))] +pub mod test_util; diff --git a/src/mito2/src/memtable/partition_tree/primary_key_filter.rs b/src/mito-codec/src/primary_key_filter.rs similarity index 96% rename from src/mito2/src/memtable/partition_tree/primary_key_filter.rs rename to src/mito-codec/src/primary_key_filter.rs index ecca5631b6..571d5a0df4 100644 --- a/src/mito2/src/memtable/partition_tree/primary_key_filter.rs +++ b/src/mito-codec/src/primary_key_filter.rs @@ -19,12 +19,17 @@ use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::value::Value; use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; use crate::error::Result; -use crate::memtable::partition_tree::partition::Partition; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec}; +/// Returns true if this is a partition column for metrics in the memtable. +pub fn is_partition_column(name: &str) -> bool { + name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME +} + #[derive(Clone)] struct PrimaryKeyFilterInner { metadata: RegionMetadataRef, @@ -42,7 +47,7 @@ impl PrimaryKeyFilterInner { let mut result = true; for filter in self.filters.iter() { - if Partition::is_partition_column(filter.column_name()) { + if is_partition_column(filter.column_name()) { continue; } let Some(column) = self.metadata.column_by_name(filter.column_name()) else { @@ -149,9 +154,8 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; - use datafusion::logical_expr::BinaryExpr; use datafusion_common::{Column, ScalarValue}; - use datafusion_expr::{Expr, Operator}; + use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; diff --git a/src/mito-codec/src/row_converter.rs b/src/mito-codec/src/row_converter.rs new file mode 100644 index 0000000000..ab14d7a549 --- /dev/null +++ b/src/mito-codec/src/row_converter.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod dense; +pub mod sparse; + +use std::fmt::Debug; +use std::sync::Arc; + +use common_recordbatch::filter::SimpleFilterEvaluator; +use datatypes::value::{Value, ValueRef}; +pub use dense::{DensePrimaryKeyCodec, SortField}; +pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE}; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::ColumnId; + +use crate::error::Result; +use crate::key_values::KeyValue; + +/// Row value encoder/decoder. +pub trait PrimaryKeyCodecExt { + /// Encodes rows to bytes. + /// # Note + /// Ensure the length of row iterator matches the length of fields. + fn encode<'a, I>(&self, row: I) -> Result> + where + I: Iterator>, + { + let mut buffer = Vec::new(); + self.encode_to_vec(row, &mut buffer)?; + Ok(buffer) + } + + /// Encodes rows to specific vec. + /// # Note + /// Ensure the length of row iterator matches the length of fields. + fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>; +} + +pub trait PrimaryKeyFilter: Send + Sync { + /// Returns true if the primary key matches the filter. + fn matches(&mut self, pk: &[u8]) -> bool; +} + +/// Composite values decoded from primary key bytes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CompositeValues { + Dense(Vec<(ColumnId, Value)>), + Sparse(SparseValues), +} + +impl CompositeValues { + /// Extends the composite values with the given values. + pub fn extend(&mut self, values: &[(ColumnId, Value)]) { + match self { + CompositeValues::Dense(dense_values) => { + for (column_id, value) in values { + dense_values.push((*column_id, value.clone())); + } + } + CompositeValues::Sparse(sprase_value) => { + for (column_id, value) in values { + sprase_value.insert(*column_id, value.clone()); + } + } + } + } +} + +#[cfg(any(test, feature = "testing"))] +impl CompositeValues { + pub fn into_sparse(self) -> SparseValues { + match self { + CompositeValues::Sparse(v) => v, + _ => panic!("CompositeValues is not sparse"), + } + } + + pub fn into_dense(self) -> Vec { + match self { + CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(), + _ => panic!("CompositeValues is not dense"), + } + } +} + +pub trait PrimaryKeyCodec: Send + Sync + Debug { + /// Encodes a key value to bytes. + fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec) -> Result<()>; + + /// Encodes values to bytes. + fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec) -> Result<()>; + + /// Encodes values to bytes. + fn encode_value_refs( + &self, + values: &[(ColumnId, ValueRef)], + buffer: &mut Vec, + ) -> Result<()>; + + /// Returns the number of fields in the primary key. + fn num_fields(&self) -> Option; + + /// Returns a primary key filter factory. + fn primary_key_filter( + &self, + metadata: &RegionMetadataRef, + filters: Arc>, + ) -> Box; + + /// Returns the estimated size of the primary key. + fn estimated_size(&self) -> Option { + None + } + + /// Returns the encoding type of the primary key. + fn encoding(&self) -> PrimaryKeyEncoding; + + /// Decodes the primary key from the given bytes. + /// + /// Returns a [`CompositeValues`] that follows the primary key ordering. + fn decode(&self, bytes: &[u8]) -> Result; + + /// Decode the leftmost value from bytes. + fn decode_leftmost(&self, bytes: &[u8]) -> Result>; +} + +/// Builds a primary key codec from region metadata. +pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc { + let fields = region_metadata.primary_key_columns().map(|col| { + ( + col.column_id, + SortField::new(col.column_schema.data_type.clone()), + ) + }); + build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields) +} + +/// Builds a primary key codec from region metadata. +pub fn build_primary_key_codec_with_fields( + encoding: PrimaryKeyEncoding, + fields: impl Iterator, +) -> Arc { + match encoding { + PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())), + PrimaryKeyEncoding::Sparse => { + Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect())) + } + } +} diff --git a/src/mito2/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs similarity index 97% rename from src/mito2/src/row_converter/dense.rs rename to src/mito-codec/src/row_converter/dense.rs index d032c782e7..391529a8e4 100644 --- a/src/mito2/src/row_converter/dense.rs +++ b/src/mito-codec/src/row_converter/dense.rs @@ -35,15 +35,16 @@ use store_api::storage::ColumnId; use crate::error::{ self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu, }; -use crate::memtable::key_values::KeyValue; -use crate::memtable::partition_tree::DensePrimaryKeyFilter; +use crate::key_values::KeyValue; +use crate::primary_key_filter::DensePrimaryKeyFilter; use crate::row_converter::{ CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter, }; +/// Field to serialize and deserialize value in memcomparable format. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SortField { - pub(crate) data_type: ConcreteDataType, + data_type: ConcreteDataType, } impl SortField { @@ -51,6 +52,11 @@ impl SortField { Self { data_type } } + /// Returns the data type of the field. + pub fn data_type(&self) -> &ConcreteDataType { + &self.data_type + } + pub fn estimated_size(&self) -> usize { match &self.data_type { ConcreteDataType::Boolean(_) => 2, @@ -75,10 +81,9 @@ impl SortField { | ConcreteDataType::Dictionary(_) => 0, } } -} -impl SortField { - pub(crate) fn serialize( + /// Serialize a value to the serializer. + pub fn serialize( &self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef, @@ -163,7 +168,8 @@ impl SortField { Ok(()) } - pub(crate) fn deserialize(&self, deserializer: &mut Deserializer) -> Result { + /// Deserialize a value from the deserializer. + pub fn deserialize(&self, deserializer: &mut Deserializer) -> Result { macro_rules! deserialize_and_build_value { ( $self: ident; @@ -525,7 +531,7 @@ mod tests { let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); decoded.push(value); } - assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets); + assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}"); assert_eq!(decoded, row); } } diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs similarity index 94% rename from src/mito2/src/row_converter/sparse.rs rename to src/mito-codec/src/row_converter/sparse.rs index 38f5268974..f80d6b8987 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId; use store_api::storage::ColumnId; use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu}; -use crate::memtable::key_values::KeyValue; -use crate::memtable::partition_tree::SparsePrimaryKeyFilter; +use crate::key_values::KeyValue; +use crate::primary_key_filter::SparsePrimaryKeyFilter; use crate::row_converter::dense::SortField; use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter}; @@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec { } /// Returns the offset of the given column id in the given primary key. - pub(crate) fn has_column( + pub fn has_column( &self, pk: &[u8], offsets_map: &mut HashMap, @@ -233,12 +233,7 @@ impl SparsePrimaryKeyCodec { } /// Decode value at `offset` in `pk`. - pub(crate) fn decode_value_at( - &self, - pk: &[u8], - offset: usize, - column_id: ColumnId, - ) -> Result { + pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result { let mut deserializer = Deserializer::new(pk); deserializer.advance(offset); // Safety: checked by `has_column` @@ -300,6 +295,40 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec { } } +/// Field with column id. +pub struct FieldWithId { + pub field: SortField, + pub column_id: ColumnId, +} + +/// A special encoder for memtable. +pub struct SparseEncoder { + fields: Vec, +} + +impl SparseEncoder { + pub fn new(fields: Vec) -> Self { + Self { fields } + } + + pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>, + { + let mut serializer = Serializer::new(buffer); + for (value, field) in row.zip(self.fields.iter()) { + if !value.is_null() { + field + .column_id + .serialize(&mut serializer) + .context(SerializeFieldSnafu)?; + field.field.serialize(&mut serializer, &value)?; + } + } + Ok(()) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/mito2/src/test_util/meta_util.rs b/src/mito-codec/src/test_util.rs similarity index 93% rename from src/mito2/src/test_util/meta_util.rs rename to src/mito-codec/src/test_util.rs index 0fde446e1f..2f72cb109a 100644 --- a/src/mito2/src/test_util/meta_util.rs +++ b/src/mito-codec/src/test_util.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata). +//! Test utilities for mito codec. +use api::greptime_proto::v1; +use api::v1::value::ValueData; use api::v1::SemanticType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -105,3 +107,10 @@ impl TestRegionMetadataBuilder { builder.build().unwrap() } } + +/// Creates value for i64. +pub fn i64_value(data: i64) -> v1::Value { + v1::Value { + value_data: Some(ValueData::I64Value(data)), + } +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index d979f6b332..71255687df 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -48,6 +48,7 @@ itertools.workspace = true lazy_static = "1.4" log-store = { workspace = true } memcomparable = "0.2" +mito-codec.workspace = true moka = { workspace = true, features = ["sync", "future"] } object-store.workspace = true parquet = { workspace = true, features = ["async"] } @@ -82,6 +83,7 @@ common-test-util.workspace = true criterion = "0.4" dotenv.workspace = true log-store.workspace = true +mito-codec = { workspace = true, features = ["testing"] } object-store = { workspace = true, features = ["services-memory"] } rskafka.workspace = true rstest.workspace = true diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index ab83975b17..040355116b 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; use mito2::region::options::MergeMode; -use mito2::row_converter::DensePrimaryKeyCodec; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; +use mito_codec::row_converter::DensePrimaryKeyCodec; use rand::rngs::ThreadRng; use rand::seq::IndexedRandom; use rand::Rng; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5685a4453a..96bc6538c2 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -42,6 +42,13 @@ use crate::worker::WorkerId; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Unexpected data type"))] + DataTypeMismatch { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error, context: {}", context))] External { source: BoxedError, @@ -291,35 +298,6 @@ pub enum Error { #[snafu(display("Failed to write region"))] WriteGroup { source: Arc }, - #[snafu(display("Row value mismatches field data type"))] - FieldTypeMismatch { source: datatypes::error::Error }, - - #[snafu(display("Failed to serialize field"))] - SerializeField { - #[snafu(source)] - error: memcomparable::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display( - "Data type: {} does not support serialization/deserialization", - data_type, - ))] - NotSupportedField { - data_type: ConcreteDataType, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to deserialize field"))] - DeserializeField { - #[snafu(source)] - error: memcomparable::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))] InvalidParquet { file: String, @@ -1028,6 +1006,20 @@ pub enum Error { location: Location, source: common_grpc::Error, }, + + #[snafu(display("Failed to encode"))] + Encode { + #[snafu(implicit)] + location: Location, + source: mito_codec::error::Error, + }, + + #[snafu(display("Failed to decode"))] + Decode { + #[snafu(implicit)] + location: Location, + source: mito_codec::error::Error, + }, } pub type Result = std::result::Result; @@ -1052,6 +1044,7 @@ impl ErrorExt for Error { use Error::*; match self { + DataTypeMismatch { source, .. } => source.status_code(), OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable, WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => { source.status_code() @@ -1095,7 +1088,6 @@ impl ErrorExt for Error { | BiErrors { .. } | StopScheduler { .. } | ComputeVector { .. } - | SerializeField { .. } | EncodeMemtable { .. } | CreateDir { .. } | ReadDataPart { .. } @@ -1107,9 +1099,7 @@ impl ErrorExt for Error { WriteParquet { .. } => StatusCode::StorageUnavailable, WriteGroup { source, .. } => source.status_code(), - FieldTypeMismatch { source, .. } => source.status_code(), - NotSupportedField { .. } => StatusCode::Unsupported, - DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected, + EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected, InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), @@ -1181,7 +1171,9 @@ impl ErrorExt for Error { ScanSeries { source, .. } => source.status_code(), ScanMultiTimes { .. } => StatusCode::InvalidArguments, - Error::ConvertBulkWalEntry { source, .. } => source.status_code(), + ConvertBulkWalEntry { source, .. } => source.status_code(), + + Encode { source, .. } | Decode { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 66ffdf9384..503b60d5c5 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -41,7 +41,6 @@ pub mod read; pub mod region; mod region_write_ctx; pub mod request; -pub mod row_converter; pub mod schedule; pub mod sst; mod time_provider; diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 16b845ebda..b810beaaf8 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -21,6 +21,8 @@ use std::sync::Arc; pub use bulk::part::EncodedBulkPart; use common_time::Timestamp; +use mito_codec::key_values::KeyValue; +pub use mito_codec::key_values::KeyValues; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; @@ -29,8 +31,6 @@ use table::predicate::Predicate; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::WriteBufferManagerRef; -use crate::memtable::key_values::KeyValue; -pub use crate::memtable::key_values::KeyValues; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; @@ -42,7 +42,6 @@ use crate::sst::file::FileTimeRange; mod builder; pub mod bulk; -pub mod key_values; pub mod partition_tree; mod simple_bulk_memtable; mod stats; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 940aee29a5..3b99eb93c5 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -16,13 +16,13 @@ use std::sync::{Arc, RwLock}; +use mito_codec::key_values::KeyValue; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart}; -use crate::memtable::key_values::KeyValue; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 0380afd7e0..5c574b6258 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -17,12 +17,12 @@ use std::collections::VecDeque; use std::sync::Arc; +use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec}; use parquet::file::metadata::ParquetMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec}; use crate::sst::parquet::file_range::RangeBase; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::SimpleFilterContext; diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 22a0dfcc0f..0756bc9231 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -38,6 +38,8 @@ use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; use datatypes::value::Value; use datatypes::vectors::Helper; +use mito_codec::key_values::{KeyValue, KeyValuesRef}; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; use parquet::arrow::ArrowWriter; use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; @@ -47,13 +49,12 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; use table::predicate::Predicate; -use crate::error; -use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; +use crate::error::{ + self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result, +}; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; -use crate::memtable::key_values::{KeyValue, KeyValuesRef}; use crate::memtable::BoxedBatchIterator; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -354,7 +355,9 @@ fn mutations_to_record_batch( for row in key_values.iter() { pk_buffer.clear(); - pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?; + pk_encoder + .encode_to_vec(row.primary_keys(), &mut pk_buffer) + .context(EncodeSnafu)?; pk_builder.append_value(pk_buffer.as_bytes()); ts_vector.push_value_ref(row.timestamp()); sequence_builder.append_value(row.sequence()); diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 68960c1afe..83c1efe301 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -19,7 +19,6 @@ mod dedup; mod dict; mod merger; mod partition; -mod primary_key_filter; mod shard; mod shard_builder; mod tree; @@ -29,7 +28,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; -pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter}; +use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec}; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; @@ -38,7 +38,6 @@ use table::predicate::Predicate; use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ @@ -47,7 +46,6 @@ use crate::memtable::{ PredicateGroup, }; use crate::region::options::MergeMode; -use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec}; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8; @@ -368,11 +366,11 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::value::Value; use datatypes::vectors::Int64Vector; + use mito_codec::row_converter::DensePrimaryKeyCodec; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; - use crate::row_converter::DensePrimaryKeyCodec; use crate::test_util::memtable_util::{ self, collect_iter_timestamps, region_metadata_to_row_schema, }; diff --git a/src/mito2/src/memtable/partition_tree/data.rs b/src/mito2/src/memtable/partition_tree/data.rs index 8651eceb4b..016583277b 100644 --- a/src/mito2/src/memtable/partition_tree/data.rs +++ b/src/mito2/src/memtable/partition_tree/data.rs @@ -33,6 +33,7 @@ use datatypes::vectors::{ TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, }; +use mito_codec::key_values::KeyValue; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -44,7 +45,6 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger}; use crate::memtable::partition_tree::PkIndex; use crate::metrics::{ diff --git a/src/mito2/src/memtable/partition_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs index 781a287eae..804396771e 100644 --- a/src/mito2/src/memtable/partition_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -22,13 +22,16 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; +use mito_codec::key_values::KeyValue; +use mito_codec::primary_key_filter::is_partition_column; +use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; +use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; -use crate::error::Result; -use crate::memtable::key_values::KeyValue; +use crate::error::{EncodeSnafu, Result}; use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP}; use crate::memtable::partition_tree::dedup::DedupReader; use crate::memtable::partition_tree::shard::{ @@ -39,7 +42,6 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId}; use crate::memtable::stats::WriteMetrics; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; -use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; /// Key of a partition. pub type PartitionKey = u32; @@ -91,7 +93,9 @@ impl Partition { // `primary_key` is sparse, re-encode the full primary key. let sparse_key = primary_key.clone(); primary_key.clear(); - row_codec.encode_key_value(&key_value, primary_key)?; + row_codec + .encode_key_value(&key_value, primary_key) + .context(EncodeSnafu)?; let pk_id = inner.shard_builder.write_with_key( primary_key, Some(&sparse_key), @@ -304,11 +308,6 @@ impl Partition { .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME) .unwrap_or(false) } - - /// Returns true if this is a partition column. - pub(crate) fn is_partition_column(name: &str) -> bool { - name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME - } } pub(crate) struct PartitionStats { @@ -446,7 +445,7 @@ impl ReadPartitionContext { fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool { for filter in filters { // We already pruned partitions before so we skip the partition column. - if Partition::is_partition_column(filter.column_name()) { + if is_partition_column(filter.column_name()) { continue; } let Some(column) = metadata.column_by_name(filter.column_name()) else { diff --git a/src/mito2/src/memtable/partition_tree/shard.rs b/src/mito2/src/memtable/partition_tree/shard.rs index d7350d7d12..0ea9c1ab87 100644 --- a/src/mito2/src/memtable/partition_tree/shard.rs +++ b/src/mito2/src/memtable/partition_tree/shard.rs @@ -17,10 +17,11 @@ use std::cmp::Ordering; use std::time::{Duration, Instant}; +use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::PrimaryKeyFilter; use store_api::metadata::RegionMetadataRef; use crate::error::Result; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{ DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP, }; @@ -29,7 +30,6 @@ use crate::memtable::partition_tree::merger::{Merger, Node}; use crate::memtable::partition_tree::shard_builder::ShardBuilderReader; use crate::memtable::partition_tree::{PkId, PkIndex, ShardId}; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; -use crate::row_converter::PrimaryKeyFilter; /// Shard stores data related to the same key dictionary. pub struct Shard { diff --git a/src/mito2/src/memtable/partition_tree/shard_builder.rs b/src/mito2/src/memtable/partition_tree/shard_builder.rs index e1720f849f..600097516d 100644 --- a/src/mito2/src/memtable/partition_tree/shard_builder.rs +++ b/src/mito2/src/memtable/partition_tree/shard_builder.rs @@ -18,10 +18,11 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; +use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::PrimaryKeyFilter; use store_api::metadata::RegionMetadataRef; use crate::error::Result; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{ DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP, }; @@ -30,7 +31,6 @@ use crate::memtable::partition_tree::shard::Shard; use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId}; use crate::memtable::stats::WriteMetrics; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; -use crate::row_converter::PrimaryKeyFilter; /// Builder to write keys and data to a shard that the key dictionary /// is still active. diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index df50c8934d..58224566fc 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -23,8 +23,10 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datafusion_common::ScalarValue; use datatypes::prelude::ValueRef; -use memcomparable::Serializer; -use serde::Serialize; +use mito_codec::key_values::KeyValue; +use mito_codec::primary_key_filter::is_partition_column; +use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder}; +use mito_codec::row_converter::{PrimaryKeyCodec, SortField}; use snafu::{ensure, ResultExt}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; @@ -32,10 +34,9 @@ use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::{ - EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu, + EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, }; use crate::flush::WriteBufferManagerRef; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::partition::{ Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, }; @@ -46,7 +47,6 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST use crate::read::dedup::LastNonNullIter; use crate::read::Batch; use crate::region::options::MergeMode; -use crate::row_converter::{PrimaryKeyCodec, SortField}; /// The partition tree. pub struct PartitionTree { @@ -73,15 +73,15 @@ impl PartitionTree { config: &PartitionTreeConfig, write_buffer_manager: Option, ) -> Self { - let sparse_encoder = SparseEncoder { - fields: metadata + let sparse_encoder = SparseEncoder::new( + metadata .primary_key_columns() .map(|c| FieldWithId { field: SortField::new(c.column_schema.data_type.clone()), column_id: c.column_id, }) .collect(), - }; + ); let is_partitioned = Partition::has_multi_partitions(&metadata); let mut config = config.clone(); if config.merge_mode == MergeMode::LastNonNull { @@ -129,7 +129,8 @@ impl PartitionTree { } else { // For compatibility, use the sparse encoder for dense primary key. self.sparse_encoder - .encode_to_vec(kv.primary_keys(), buffer)?; + .encode_to_vec(kv.primary_keys(), buffer) + .context(EncodeSnafu)?; } Ok(()) } @@ -166,7 +167,9 @@ impl PartitionTree { if self.is_partitioned { self.encode_sparse_primary_key(&kv, pk_buffer)?; } else { - self.row_codec.encode_key_value(&kv, pk_buffer)?; + self.row_codec + .encode_key_value(&kv, pk_buffer) + .context(EncodeSnafu)?; } // Write rows with @@ -208,7 +211,9 @@ impl PartitionTree { if self.is_partitioned { self.encode_sparse_primary_key(&kv, pk_buffer)?; } else { - self.row_codec.encode_key_value(&kv, pk_buffer)?; + self.row_codec + .encode_key_value(&kv, pk_buffer) + .context(EncodeSnafu)?; } // Write rows with @@ -415,7 +420,7 @@ impl PartitionTree { for (key, partition) in partitions.iter() { let mut is_needed = true; for filter in filters { - if !Partition::is_partition_column(filter.column_name()) { + if !is_partition_column(filter.column_name()) { continue; } @@ -436,34 +441,6 @@ impl PartitionTree { } } -struct FieldWithId { - field: SortField, - column_id: ColumnId, -} - -struct SparseEncoder { - fields: Vec, -} - -impl SparseEncoder { - fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> - where - I: Iterator>, - { - let mut serializer = Serializer::new(buffer); - for (value, field) in row.zip(self.fields.iter()) { - if !value.is_null() { - field - .column_id - .serialize(&mut serializer) - .context(SerializeFieldSnafu)?; - field.field.serialize(&mut serializer, &value)?; - } - } - Ok(()) - } -} - #[derive(Default)] struct TreeIterMetrics { iter_elapsed: Duration, diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 7ea60df05f..ad97bd7077 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock}; use api::v1::OpType; use datatypes::vectors::Helper; +use mito_codec::key_values::KeyValue; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; @@ -26,7 +27,6 @@ use table::predicate::Predicate; use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::key_values::KeyValue; use crate::memtable::stats::WriteMetrics; use crate::memtable::time_series::{Series, Values}; use crate::memtable::{ diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 55dbe1aae1..d422daa0b2 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -29,6 +29,7 @@ use datatypes::arrow::array::{ }; use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer}; use datatypes::arrow::datatypes::{DataType, Int64Type}; +use mito_codec::key_values::KeyValue; use smallvec::{smallvec, SmallVec}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -36,7 +37,6 @@ use store_api::metadata::RegionMetadataRef; use crate::error; use crate::error::{InvalidRequestSnafu, Result}; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::key_values::KeyValue; use crate::memtable::version::SmallMemtableVec; use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b6e6db2c64..4a8f5953e9 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -35,17 +35,19 @@ use datatypes::vectors::{ Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt64Vector, UInt8Vector, }; +use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; -use crate::error; -use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{ + self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result, +}; use crate::flush::WriteBufferManagerRef; use crate::memtable::builder::{FieldBuilder, StringBuilder}; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::key_values::KeyValue; use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ @@ -57,7 +59,6 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::region::options::MergeMode; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; /// Initial vector builder capacity. const INITIAL_BUILDER_CAPACITY: usize = 4; @@ -176,7 +177,10 @@ impl TimeSeriesMemtable { } ); - let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; + let primary_key_encoded = self + .row_codec + .encode(kv.primary_keys()) + .context(EncodeSnafu)?; let (key_allocated, value_allocated) = self.series_set.push_to_series(primary_key_encoded, &kv); @@ -1107,11 +1111,11 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::value::{OrderedFloat, Value}; use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector}; + use mito_codec::row_converter::SortField; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; - use crate::row_converter::SortField; use crate::test_util::column_metadata_to_column_schema; fn schema_for_test() -> RegionMetadataRef { diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index af5ee20a30..5b62fcfc40 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -50,16 +50,17 @@ use datatypes::vectors::{ }; use futures::stream::BoxStream; use futures::TryStreamExt; +use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ - ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, + ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, + Result, }; use crate::memtable::BoxedBatchIterator; use crate::read::prune::PruneReader; -use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -612,7 +613,7 @@ impl Batch { column_id: ColumnId, ) -> Result> { if self.pk_values.is_none() { - self.pk_values = Some(codec.decode(&self.primary_key)?); + self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?); } let pk_values = self.pk_values.as_ref().unwrap(); @@ -1026,12 +1027,12 @@ pub(crate) struct ScannerMetrics { #[cfg(test)] mod tests { + use mito_codec::row_converter::{self, build_primary_key_codec_with_fields}; use store_api::codec::PrimaryKeyEncoding; use store_api::storage::consts::ReservedColumnId; use super::*; use crate::error::Error; - use crate::row_converter::{self, build_primary_key_codec_with_fields}; use crate::test_util::new_batch_builder; fn new_batch( diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index c103bbaa9c..4bd23987b7 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -20,17 +20,17 @@ use std::sync::Arc; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use mito_codec::row_converter::{ + build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, + SortField, +}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; -use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result}; +use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::{Batch, BatchColumn, BatchReader}; -use crate::row_converter::{ - build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, - SortField, -}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { @@ -155,7 +155,9 @@ impl CompatPrimaryKey { batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(), ); buffer.extend_from_slice(batch.primary_key()); - self.converter.encode_values(&self.values, &mut buffer)?; + self.converter + .encode_values(&self.values, &mut buffer) + .context(EncodeSnafu)?; batch.set_primary_key(buffer); @@ -405,7 +407,10 @@ impl RewritePrimaryKey { let values = if let Some(pk_values) = batch.pk_values() { pk_values } else { - let new_pk_values = self.original.decode(batch.primary_key())?; + let new_pk_values = self + .original + .decode(batch.primary_key()) + .context(DecodeSnafu)?; batch.set_pk_values(new_pk_values); // Safety: We ensure pk_values is not None. batch.pk_values().as_ref().unwrap() @@ -416,7 +421,9 @@ impl RewritePrimaryKey { ); match values { CompositeValues::Dense(values) => { - self.new.encode_values(values.as_slice(), &mut buffer)?; + self.new + .encode_values(values.as_slice(), &mut buffer) + .context(EncodeSnafu)?; } CompositeValues::Sparse(values) => { let values = self @@ -427,7 +434,9 @@ impl RewritePrimaryKey { (*id, value.as_value_ref()) }) .collect::>(); - self.new.encode_value_refs(&values, &mut buffer)?; + self.new + .encode_value_refs(&values, &mut buffer) + .context(EncodeSnafu)?; } } batch.set_primary_key(buffer); @@ -445,12 +454,14 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; + use mito_codec::row_converter::{ + DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, + }; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; - use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec}; use crate::test_util::{check_reader_result, VecBatchReader}; /// Creates a new [RegionMetadata]. diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 6d6de78a74..1837b8e18e 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -26,6 +26,7 @@ use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -33,7 +34,6 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; -use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec}; /// Only cache vector when its length `<=` this value. const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; @@ -320,12 +320,12 @@ mod tests { use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::arrow::util::pretty; use datatypes::value::ValueRef; + use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; + use mito_codec::test_util::TestRegionMetadataBuilder; use super::*; use crate::cache::CacheManager; use crate::read::BatchBuilder; - use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; - use crate::test_util::meta_util::TestRegionMetadataBuilder; fn new_batch( ts_start: i64, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 5910e78485..de86d32a37 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -922,11 +922,12 @@ mod tests { use api::v1::{Row, SemanticType}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; + use mito_codec::test_util::i64_value; use store_api::metadata::RegionMetadataBuilder; use super::*; use crate::error::Error; - use crate::test_util::{i64_value, ts_ms_value}; + use crate::test_util::ts_ms_value; fn new_column_schema( name: &str, diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 1cafc3e7ae..2bafc49ca3 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -20,13 +20,13 @@ use std::sync::Arc; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::value::{Value, ValueRef}; pub use dense::{DensePrimaryKeyCodec, SortField}; +use mito_codec::key_values::KeyValue; pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::Result; -use crate::memtable::key_values::KeyValue; /// Row value encoder/decoder. pub trait PrimaryKeyCodecExt { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 3abeee26ff..65bc02c7c1 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) mod bloom_filter; -mod codec; pub(crate) mod fulltext_index; mod indexer; pub mod intermediate; diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index fcf9e0f768..9ae25c4544 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -22,6 +22,8 @@ use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::bloom_filter::applier::InListPredicate; use index::Bytes; +use mito_codec::index::IndexValueCodec; +use mito_codec::row_converter::SortField; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use snafu::{OptionExt, ResultExt}; @@ -30,10 +32,8 @@ use store_api::storage::ColumnId; use crate::cache::file_cache::FileCacheRef; use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef; -use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result}; -use crate::row_converter::SortField; +use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier; -use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; pub struct BloomFilterIndexApplierBuilder<'a> { @@ -322,7 +322,8 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result { let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; let mut bytes = vec![]; let field = SortField::new(data_type); - IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes) + .context(EncodeSnafu)?; Ok(bytes) } diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 53821c7cf2..4c1dbd13e3 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use common_telemetry::{debug, warn}; use datatypes::schema::SkippingIndexType; use index::bloom_filter::creator::BloomFilterCreator; +use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; +use mito_codec::row_converter::SortField; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -26,14 +28,12 @@ use store_api::storage::ColumnId; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use crate::error::{ - BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, - PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, + BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu, + OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::SortField; use crate::sst::file::FileId; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; -use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::intermediate::{ IntermediateLocation, IntermediateManager, TempFileProvider, }; @@ -210,7 +210,8 @@ impl BloomFilterIndexer { v.as_value_ref(), field, &mut buf, - )?; + ) + .context(EncodeSnafu)?; Ok(buf) }) .transpose()?; @@ -234,11 +235,8 @@ impl BloomFilterIndexer { let elems = (!value.is_null()) .then(|| { let mut buf = vec![]; - IndexValueCodec::encode_nonnull_value( - value, - &sort_field, - &mut buf, - )?; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf) + .context(EncodeSnafu)?; Ok(buf) }) .transpose()?; @@ -353,6 +351,7 @@ pub(crate) mod tests { use datatypes::value::ValueRef; use datatypes::vectors::{UInt64Vector, UInt8Vector}; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; + use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use object_store::services::Memory; use object_store::ObjectStore; use puffin::puffin_manager::{PuffinManager, PuffinReader}; @@ -362,7 +361,6 @@ pub(crate) mod tests { use super::*; use crate::access_layer::FilePathProvider; use crate::read::BatchColumn; - use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::puffin_manager::PuffinManagerFactory; pub fn mock_object_store() -> ObjectStore { diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index dfc4127435..07da9f3b47 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ - CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, + CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result, }; use crate::read::Batch; @@ -259,7 +259,7 @@ impl SingleCreator { let data = data.get_ref(i); let text = data .as_string() - .context(FieldTypeMismatchSnafu)? + .context(DataTypeMismatchSnafu)? .unwrap_or_default(); self.inner.push_text(text).await?; } diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index eeda08e47a..bc7acd12ac 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -27,6 +27,8 @@ use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; use index::inverted_index::search::predicate::Predicate; +use mito_codec::index::IndexValueCodec; +use mito_codec::row_converter::SortField; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use snafu::{OptionExt, ResultExt}; @@ -35,9 +37,9 @@ use store_api::storage::ColumnId; use crate::cache::file_cache::FileCacheRef; use crate::cache::index::inverted_index::InvertedIndexCacheRef; -use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; -use crate::row_converter::SortField; -use crate::sst::index::codec::IndexValueCodec; +use crate::error::{ + BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result, +}; use crate::sst::index::inverted_index::applier::InvertedIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -230,7 +232,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> { let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; let mut bytes = vec![]; let field = SortField::new(data_type); - IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes) + .context(EncodeSnafu)?; Ok(bytes) } } diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index 51f7f001e2..5b2c3a1459 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -194,7 +194,7 @@ mod tests { }; let res = builder.collect_between(&between); - assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(matches!(res, Err(Error::Encode { .. }))); assert!(builder.output.is_empty()); } diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index b09ac93f71..1b1a3e2854 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -264,7 +264,7 @@ mod tests { ); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); - assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(matches!(res, Err(Error::Encode { .. }))); assert!(builder.output.is_empty()); } diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 9b08118eea..a4deb9b3a5 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -226,7 +226,7 @@ mod tests { ); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); - assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(matches!(res, Err(Error::Encode { .. }))); assert!(builder.output.is_empty()); } diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index cb9993b6fb..68199a4b1f 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -167,7 +167,7 @@ mod tests { }; let res = builder.collect_inlist(&in_list); - assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(matches!(res, Err(Error::Encode { .. }))); assert!(builder.output.is_empty()); } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 6f44979c78..7eb60fec9f 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -22,6 +22,8 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; +use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; +use mito_codec::row_converter::SortField; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -30,13 +32,11 @@ use tokio::io::duplex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use crate::error::{ - BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, + BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushIndexValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::SortField; use crate::sst::file::FileId; -use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::intermediate::{ IntermediateLocation, IntermediateManager, TempFileProvider, }; @@ -205,7 +205,8 @@ impl InvertedIndexer { v.as_value_ref(), field, &mut self.value_buf, - )?; + ) + .context(EncodeSnafu)?; Ok(self.value_buf.as_slice()) }) .transpose()?; @@ -238,7 +239,8 @@ impl InvertedIndexer { value, &sort_field, &mut self.value_buf, - )?; + ) + .context(EncodeSnafu)?; self.index_creator .push_with_name(col_id_str, Some(&self.value_buf)) .await @@ -334,6 +336,7 @@ mod tests { use datatypes::value::ValueRef; use datatypes::vectors::{UInt64Vector, UInt8Vector}; use futures::future::BoxFuture; + use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use object_store::services::Memory; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCache; @@ -346,7 +349,6 @@ mod tests { use crate::cache::index::inverted_index::InvertedIndexCache; use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; - use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 76f93e6ee0..3ef0c809a8 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -22,18 +22,19 @@ use api::v1::{OpType, SemanticType}; use common_telemetry::error; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; +use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use parquet::arrow::arrow_reader::RowSelection; use snafu::{OptionExt, ResultExt}; use store_api::storage::TimeSeriesRowSelector; use crate::error::{ - DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, + DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result, + StatsNotPresentSnafu, }; use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::PruneReader; use crate::read::Batch; -use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ @@ -270,7 +271,11 @@ impl RangeBase { let pk_values = if let Some(pk_values) = input.pk_values() { pk_values } else { - input.set_pk_values(self.codec.decode(input.primary_key())?); + input.set_pk_values( + self.codec + .decode(input.primary_key()) + .context(DecodeSnafu)?, + ); input.pk_values().unwrap() }; let pk_value = match pk_values { @@ -284,12 +289,12 @@ impl RangeBase { v[pk_index] .1 .try_to_scalar_value(filter_ctx.data_type()) - .context(FieldTypeMismatchSnafu)? + .context(DataTypeMismatchSnafu)? } CompositeValues::Sparse(v) => { let v = v.get_or_null(filter_ctx.column_id()); v.try_to_scalar_value(filter_ctx.data_type()) - .context(FieldTypeMismatchSnafu)? + .context(DataTypeMismatchSnafu)? } }; if filter diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 8ed6c241bf..d7611ce67c 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -38,6 +38,7 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; +use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; @@ -48,7 +49,6 @@ use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; -use crate::row_converter::{build_primary_key_codec_with_fields, SortField}; use crate::sst::file::{FileMeta, FileTimeRange}; use crate::sst::to_sst_arrow_schema; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6f3feeec07..2878a87f3e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -26,6 +26,7 @@ use datafusion_expr::Expr; use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; +use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; @@ -48,7 +49,6 @@ use crate::metrics::{ }; use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; -use crate::row_converter::build_primary_key_codec; use crate::sst::file::{FileHandle, FileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8c51cdaf25..6833e807ed 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -16,7 +16,6 @@ pub mod batch_util; pub mod memtable_util; -pub mod meta_util; pub mod scheduler_util; pub mod sst_util; pub mod version_util; @@ -841,13 +840,6 @@ impl CreateRequestBuilder { } } -/// Creates value for i64. -pub(crate) fn i64_value(data: i64) -> v1::Value { - v1::Value { - value_data: Some(ValueData::I64Value(data)), - } -} - /// Creates value for timestamp millis. pub(crate) fn ts_ms_value(data: i64) -> v1::Value { v1::Value { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 2d118b17da..61b534b489 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -25,20 +25,20 @@ use datatypes::data_type::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::schema::ColumnSchema; use datatypes::vectors::TimestampMillisecondVector; +use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; use crate::read::scan_region::PredicateGroup; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; /// Empty memtable for test. #[derive(Debug, Default)] diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 15c5cc7c92..e8fc2e7803 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -22,6 +22,7 @@ use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Arra use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use parquet::file::metadata::ParquetMetaData; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, @@ -29,7 +30,6 @@ use store_api::metadata::{ use store_api::storage::RegionId; use crate::read::{Batch, BatchBuilder, Source}; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};