From 55b5df9c51cdf2f6026015158670cb375a93d9e4 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 14 Aug 2023 15:13:39 +0800 Subject: [PATCH] feat: row wise converter (#2162) * feat: impl mem-comparable encoding for timestamp * fix: test cases * impl time series encode/decoder * fix: merge unsupported match arms * fix: clippy * chore: big number delimiter * feat: encode timestamps as i64 * fix: remove useless error variant --- Cargo.lock | 14 ++ src/datatypes/src/value.rs | 89 +++++++- src/mito2/Cargo.toml | 3 + src/mito2/src/error.rs | 47 ++++ src/mito2/src/lib.rs | 2 + src/mito2/src/row_converter.rs | 378 +++++++++++++++++++++++++++++++++ 6 files changed, 527 insertions(+), 6 deletions(-) create mode 100644 src/mito2/src/row_converter.rs diff --git a/Cargo.lock b/Cargo.lock index 4cc0cbc49e..66eb7257f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5220,6 +5220,17 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memcomparable" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677" +dependencies = [ + "bytes", + "serde", + "thiserror", +] + [[package]] name = "memmap2" version = "0.5.10" @@ -5521,6 +5532,7 @@ dependencies = [ "async-compat", "async-stream", "async-trait", + "bytes", "chrono", "common-base", "common-catalog", @@ -5542,9 +5554,11 @@ dependencies = [ "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)", "lazy_static", "log-store", + "memcomparable", "metrics", "object-store", "parquet", + "paste", "prost", "regex", "serde", diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 7a9196fbb4..9f154d65c3 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -474,15 +474,11 @@ impl_value_from!(Float64, f64); impl_value_from!(String, StringBytes); impl_value_from!(Binary, Bytes); impl_value_from!(Date, Date); +impl_value_from!(Time, Time); impl_value_from!(DateTime, DateTime); impl_value_from!(Timestamp, Timestamp); impl_value_from!(Interval, Interval); - -impl From for Value { - fn from(string: String) -> Value { - Value::String(string.into()) - } -} +impl_value_from!(String, String); impl From<&str> for Value { fn from(string: &str) -> Value { @@ -703,6 +699,33 @@ impl TryFrom for Value { } } +impl From> for Value { + fn from(value: ValueRef<'_>) -> Self { + match value { + ValueRef::Null => Value::Null, + ValueRef::Boolean(v) => Value::Boolean(v), + ValueRef::UInt8(v) => Value::UInt8(v), + ValueRef::UInt16(v) => Value::UInt16(v), + ValueRef::UInt32(v) => Value::UInt32(v), + ValueRef::UInt64(v) => Value::UInt64(v), + ValueRef::Int8(v) => Value::Int8(v), + ValueRef::Int16(v) => Value::Int16(v), + ValueRef::Int32(v) => Value::Int32(v), + ValueRef::Int64(v) => Value::Int64(v), + ValueRef::Float32(v) => Value::Float32(v), + ValueRef::Float64(v) => Value::Float64(v), + ValueRef::String(v) => Value::String(v.into()), + ValueRef::Binary(v) => Value::Binary(v.into()), + ValueRef::Date(v) => Value::Date(v), + ValueRef::DateTime(v) => Value::DateTime(v), + ValueRef::Timestamp(v) => Value::Timestamp(v), + ValueRef::Time(v) => Value::Time(v), + ValueRef::Interval(v) => Value::Interval(v), + ValueRef::List(v) => v.to_value(), + } + } +} + /// Reference to [Value]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ValueRef<'a> { @@ -774,6 +797,60 @@ impl<'a> ValueRef<'a> { impl_as_for_value_ref!(self, Boolean) } + pub fn as_i8(&self) -> Result> { + impl_as_for_value_ref!(self, Int8) + } + + pub fn as_u8(&self) -> Result> { + impl_as_for_value_ref!(self, UInt8) + } + + pub fn as_i16(&self) -> Result> { + impl_as_for_value_ref!(self, Int16) + } + + pub fn as_u16(&self) -> Result> { + impl_as_for_value_ref!(self, UInt16) + } + + pub fn as_i32(&self) -> Result> { + impl_as_for_value_ref!(self, Int32) + } + + pub fn as_u32(&self) -> Result> { + impl_as_for_value_ref!(self, UInt32) + } + + pub fn as_i64(&self) -> Result> { + impl_as_for_value_ref!(self, Int64) + } + + pub fn as_u64(&self) -> Result> { + impl_as_for_value_ref!(self, UInt64) + } + + pub fn as_f32(&self) -> Result> { + match self { + ValueRef::Null => Ok(None), + ValueRef::Float32(f) => Ok(Some(f.0)), + other => error::CastTypeSnafu { + msg: format!("Failed to cast value ref {:?} to ValueRef::Float32", other,), + } + .fail(), + } + } + + pub fn as_f64(&self) -> Result> { + match self { + ValueRef::Null => Ok(None), + ValueRef::Float64(f) => Ok(Some(f.0)), + other => error::CastTypeSnafu { + msg: format!("Failed to cast value ref {:?} to ValueRef::Float64", other,), + } + .fail(), + } + } + /// Cast itself to [Date]. pub fn as_date(&self) -> Result> { impl_as_for_value_ref!(self, Date) diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 2ee9dfb931..dd338f1d68 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -15,6 +15,7 @@ arc-swap = "1.0" async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" +bytes = "1.4" chrono.workspace = true common-base = { workspace = true } common-catalog = { workspace = true } @@ -36,9 +37,11 @@ futures.workspace = true greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" } lazy_static = "1.4" log-store = { workspace = true } +memcomparable = "0.2" metrics.workspace = true object-store = { workspace = true } parquet = { workspace = true, features = ["async"] } +paste.workspace = true prost.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index df8aa441a1..bd6df63d01 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -19,6 +19,7 @@ use common_datasource::compression::CompressionType; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use datatypes::arrow::error::ArrowError; +use datatypes::prelude::ConcreteDataType; use prost::{DecodeError, EncodeError}; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; @@ -260,6 +261,47 @@ pub enum Error { // Shared error for each writer in the write group. #[snafu(display("Failed to write region, source: {}", source))] WriteGroup { source: Arc }, + + #[snafu(display( + "Row length mismatch, expect: {}, actual: {}, location: {}", + expect, + actual, + location + ))] + RowLengthMismatch { + expect: usize, + actual: usize, + location: Location, + }, + + #[snafu(display("Row value mismatches field data type"))] + FieldTypeMismatch { source: datatypes::error::Error }, + + #[snafu(display("Failed to serialize field, location: {}", location))] + SerializeField { + source: memcomparable::Error, + location: Location, + }, + + #[snafu(display( + "Data type: {} does not support serialization/deserialization, location: {}", + data_type, + location + ))] + NotSupportedField { + data_type: ConcreteDataType, + location: Location, + }, + + #[snafu(display( + "Failed to deserialize field, source: {} location: {}", + source, + location + ))] + DeserializeField { + source: memcomparable::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -304,6 +346,11 @@ impl ErrorExt for Error { | DecodeWal { .. } => StatusCode::Internal, WriteBuffer { source, .. } => source.status_code(), WriteGroup { source, .. } => source.status_code(), + RowLengthMismatch { .. } => StatusCode::InvalidArguments, + FieldTypeMismatch { source, .. } => source.status_code(), + SerializeField { .. } => StatusCode::Internal, + NotSupportedField { .. } => StatusCode::Unsupported, + DeserializeField { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 015c96c5c1..4cdaf26a81 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -38,6 +38,8 @@ mod region; #[allow(dead_code)] pub mod request; #[allow(dead_code)] +mod row_converter; +#[allow(dead_code)] pub mod sst; pub mod wal; #[allow(dead_code)] diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs new file mode 100644 index 0000000000..f755aec003 --- /dev/null +++ b/src/mito2/src/row_converter.rs @@ -0,0 +1,378 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Buf; +use common_base::bytes::Bytes; +use common_time::time::Time; +use common_time::{Date, Interval}; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::Value; +use datatypes::value::ValueRef; +use memcomparable::{Deserializer, Serializer}; +use paste::paste; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; + +use crate::error; +use crate::error::{ + FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, RowLengthMismatchSnafu, + SerializeFieldSnafu, +}; + +/// Row value encoder/decoder. +pub trait RowCodec { + /// Encodes rows to bytes. + fn encode<'a, I>(&self, rows: I) -> Result> + where + I: Iterator]>; + + /// Decode row values from bytes. + fn decode(&self, bytes: &[u8]) -> Result>>; +} + +pub struct SortField { + data_type: ConcreteDataType, +} + +impl SortField { + pub fn new(data_type: ConcreteDataType) -> Self { + Self { data_type } + } + + pub fn estimated_size(&self) -> usize { + match &self.data_type { + ConcreteDataType::Boolean(_) => 2, + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, + ConcreteDataType::Float32(_) => 5, + ConcreteDataType::Float64(_) => 9, + ConcreteDataType::Binary(_) => 11, + ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes. + ConcreteDataType::Date(_) => 5, + ConcreteDataType::DateTime(_) => 9, + ConcreteDataType::Timestamp(_) => 10, + ConcreteDataType::Time(_) => 10, + ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => 0, + } + } +} + +impl SortField { + fn serialize(&self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef) -> Result<()> { + macro_rules! cast_value_and_serialize { + ( + $self: ident; + $serializer: ident; + $( + $ty: ident, $f: ident + ),* + ) => { + match &$self.data_type { + $( + ConcreteDataType::$ty(_) => { + paste!{ + value + .[]() + .context(FieldTypeMismatchSnafu)? + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + } + )* + ConcreteDataType::Timestamp(_) => { + let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?; + timestamp + .map(|t|t.value()) + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + ConcreteDataType::List(_) | + ConcreteDataType::Dictionary(_) | + ConcreteDataType::Null(_) => { + return error::NotSupportedFieldSnafu { + data_type: $self.data_type.clone() + }.fail() + } + } + }; + } + cast_value_and_serialize!(self; serializer; + Boolean, boolean, + Binary, binary, + Int8, i8, + UInt8, u8, + Int16, i16, + UInt16, u16, + Int32, i32, + UInt32, u32, + Int64, i64, + UInt64, u64, + Float32, f32, + Float64, f64, + String, string, + Date, date, + DateTime, datetime, + Time, time, + Interval, interval + ); + + Ok(()) + } + + fn deserialize(&self, deserializer: &mut Deserializer) -> Result { + use common_time::DateTime; + macro_rules! deserialize_and_build_value { + ( + $self: ident; + $serializer: ident; + $( + $ty: ident, $f: ident + ),* + ) => { + + match &$self.data_type { + $( + ConcreteDataType::$ty(_) => { + Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) + } + )* + ConcreteDataType::Binary(_) => Ok(Value::from( + Option::>::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(Bytes::from), + )), + ConcreteDataType::Timestamp(ty) => { + let timestamp = Option::::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(|t|ty.create_timestamp(t)); + Ok(Value::from(timestamp)) + } + ConcreteDataType::List(l) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::List(l.clone()), + } + .fail(), + ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::Dictionary(d.clone()), + } + .fail(), + ConcreteDataType::Null(n) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::Null(n.clone()), + } + .fail(), + } + }; + } + deserialize_and_build_value!(self; deserializer; + Boolean, bool, + Int8, i8, + Int16, i16, + Int32, i32, + Int64, i64, + UInt8, u8, + UInt16, u16, + UInt32, u32, + UInt64, u64, + Float32, f32, + Float64, f64, + String, String, + Date, Date, + Time, Time, + DateTime, DateTime, + Interval, Interval + ) + } +} + +/// A memory-comparable row [Value] encoder/decoder. +pub struct McmpRowCodec { + fields: Vec, +} + +impl McmpRowCodec { + pub fn new(fields: Vec) -> Self { + Self { fields } + } + + /// Estimated length for encoded bytes. + fn estimated_length(&self) -> usize { + self.fields.iter().map(|f| f.estimated_size()).sum() + } +} + +impl RowCodec for McmpRowCodec { + fn encode<'a, I>(&self, rows: I) -> Result> + where + I: Iterator]>, + { + let mut bytes = Vec::with_capacity(self.estimated_length()); + let mut serializer = memcomparable::Serializer::new(&mut bytes); + + for row in rows { + ensure!( + row.len() == self.fields.len(), + RowLengthMismatchSnafu { + expect: self.fields.len(), + actual: row.len(), + } + ); + + for (value, field) in row.iter().zip(self.fields.iter()) { + field.serialize(&mut serializer, value)?; + } + } + Ok(bytes) + } + + fn decode(&self, bytes: &[u8]) -> Result>> { + let mut deserializer = memcomparable::Deserializer::new(bytes); + let mut res = vec![]; + while deserializer.has_remaining() { + let mut values = Vec::with_capacity(self.fields.len()); + for f in &self.fields { + let value = f.deserialize(&mut deserializer)?; + values.push(value); + } + res.push(values); + } + Ok(res) + } +} + +#[cfg(test)] +mod tests { + use common_base::bytes::StringBytes; + use common_time::Timestamp; + use datatypes::value::Value; + + use super::*; + + fn check_encode_and_decode(data_types: &[ConcreteDataType], rows: &[Vec]) { + let encoder = McmpRowCodec::new( + data_types + .iter() + .map(|t| SortField::new(t.clone())) + .collect::>(), + ); + + let value_ref = rows + .iter() + .map(|row| row.iter().map(|v| v.as_value_ref()).collect::>()) + .collect::>(); + let result = encoder + .encode(value_ref.iter().map(|r| r.as_slice())) + .unwrap(); + let decoded = encoder.decode(&result).unwrap(); + assert_eq!(value_ref.len(), decoded.len()); + + for (i, row) in rows.iter().enumerate() { + assert_eq!(row, decoded.get(i).unwrap() as &[Value]); + } + } + + #[test] + fn test_memcmp() { + let encoder = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int64_datatype()), + ]); + let values = [Value::String("abcdefgh".into()), Value::Int64(128)]; + let value_ref = values.iter().map(|v| v.as_value_ref()).collect::>(); + let result = encoder.encode(std::iter::once(&value_ref as _)).unwrap(); + + let decoded = encoder.decode(&result).unwrap(); + assert_eq!(1, decoded.len()); + assert_eq!(&values, decoded.get(0).unwrap() as &[Value]); + } + + #[test] + fn test_memcmp_timestamp() { + check_encode_and_decode( + &[ + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::int64_datatype(), + ], + &[vec![ + Value::Timestamp(Timestamp::new_millisecond(42)), + Value::Int64(43), + ]], + ); + } + + #[test] + fn test_memcmp_binary() { + check_encode_and_decode( + &[ + ConcreteDataType::binary_datatype(), + ConcreteDataType::int64_datatype(), + ], + &[vec![ + Value::Binary(Bytes::from("hello".as_bytes())), + Value::Int64(43), + ]], + ); + } + + #[test] + fn test_memcmp_string() { + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + &[ + vec![Value::String(StringBytes::from("hello"))], + vec![Value::Null], + vec![Value::String("".into())], + vec![Value::String("world".into())], + ], + ); + } + + #[test] + fn test_encode_null() { + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int32_datatype(), + ], + &[vec![Value::String(StringBytes::from("abcd")), Value::Null]], + ) + } + + #[test] + fn test_encode_multiple_rows() { + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + &[ + vec![ + Value::String("hello".into()), + Value::Int64(42), + Value::Boolean(false), + ], + vec![ + Value::String("world".into()), + Value::Int64(43), + Value::Boolean(true), + ], + vec![Value::Null, Value::Int64(43), Value::Boolean(true)], + ], + ); + } +}