feat: row wise converter (#2162)

* feat: impl mem-comparable encoding for timestamp

* fix: test cases

* impl time series encode/decoder

* fix: merge unsupported match arms

* fix: clippy

* chore: big number delimiter

* feat: encode timestamps as i64

* fix: remove useless error variant
This commit is contained in:
Lei, HUANG
2023-08-14 15:13:39 +08:00
committed by GitHub
parent 393047a541
commit 55b5df9c51
6 changed files with 527 additions and 6 deletions

14
Cargo.lock generated
View File

@@ -5220,6 +5220,17 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memcomparable"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677"
dependencies = [
"bytes",
"serde",
"thiserror",
]
[[package]]
name = "memmap2"
version = "0.5.10"
@@ -5521,6 +5532,7 @@ dependencies = [
"async-compat",
"async-stream",
"async-trait",
"bytes",
"chrono",
"common-base",
"common-catalog",
@@ -5542,9 +5554,11 @@ dependencies = [
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)",
"lazy_static",
"log-store",
"memcomparable",
"metrics",
"object-store",
"parquet",
"paste",
"prost",
"regex",
"serde",

View File

@@ -474,15 +474,11 @@ impl_value_from!(Float64, f64);
impl_value_from!(String, StringBytes);
impl_value_from!(Binary, Bytes);
impl_value_from!(Date, Date);
impl_value_from!(Time, Time);
impl_value_from!(DateTime, DateTime);
impl_value_from!(Timestamp, Timestamp);
impl_value_from!(Interval, Interval);
impl From<String> for Value {
fn from(string: String) -> Value {
Value::String(string.into())
}
}
impl_value_from!(String, String);
impl From<&str> for Value {
fn from(string: &str) -> Value {
@@ -703,6 +699,33 @@ impl TryFrom<ScalarValue> for Value {
}
}
impl From<ValueRef<'_>> for Value {
fn from(value: ValueRef<'_>) -> Self {
match value {
ValueRef::Null => Value::Null,
ValueRef::Boolean(v) => Value::Boolean(v),
ValueRef::UInt8(v) => Value::UInt8(v),
ValueRef::UInt16(v) => Value::UInt16(v),
ValueRef::UInt32(v) => Value::UInt32(v),
ValueRef::UInt64(v) => Value::UInt64(v),
ValueRef::Int8(v) => Value::Int8(v),
ValueRef::Int16(v) => Value::Int16(v),
ValueRef::Int32(v) => Value::Int32(v),
ValueRef::Int64(v) => Value::Int64(v),
ValueRef::Float32(v) => Value::Float32(v),
ValueRef::Float64(v) => Value::Float64(v),
ValueRef::String(v) => Value::String(v.into()),
ValueRef::Binary(v) => Value::Binary(v.into()),
ValueRef::Date(v) => Value::Date(v),
ValueRef::DateTime(v) => Value::DateTime(v),
ValueRef::Timestamp(v) => Value::Timestamp(v),
ValueRef::Time(v) => Value::Time(v),
ValueRef::Interval(v) => Value::Interval(v),
ValueRef::List(v) => v.to_value(),
}
}
}
/// Reference to [Value].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValueRef<'a> {
@@ -774,6 +797,60 @@ impl<'a> ValueRef<'a> {
impl_as_for_value_ref!(self, Boolean)
}
pub fn as_i8(&self) -> Result<Option<i8>> {
impl_as_for_value_ref!(self, Int8)
}
pub fn as_u8(&self) -> Result<Option<u8>> {
impl_as_for_value_ref!(self, UInt8)
}
pub fn as_i16(&self) -> Result<Option<i16>> {
impl_as_for_value_ref!(self, Int16)
}
pub fn as_u16(&self) -> Result<Option<u16>> {
impl_as_for_value_ref!(self, UInt16)
}
pub fn as_i32(&self) -> Result<Option<i32>> {
impl_as_for_value_ref!(self, Int32)
}
pub fn as_u32(&self) -> Result<Option<u32>> {
impl_as_for_value_ref!(self, UInt32)
}
pub fn as_i64(&self) -> Result<Option<i64>> {
impl_as_for_value_ref!(self, Int64)
}
pub fn as_u64(&self) -> Result<Option<u64>> {
impl_as_for_value_ref!(self, UInt64)
}
pub fn as_f32(&self) -> Result<Option<f32>> {
match self {
ValueRef::Null => Ok(None),
ValueRef::Float32(f) => Ok(Some(f.0)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value ref {:?} to ValueRef::Float32", other,),
}
.fail(),
}
}
pub fn as_f64(&self) -> Result<Option<f64>> {
match self {
ValueRef::Null => Ok(None),
ValueRef::Float64(f) => Ok(Some(f.0)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value ref {:?} to ValueRef::Float64", other,),
}
.fail(),
}
}
/// Cast itself to [Date].
pub fn as_date(&self) -> Result<Option<Date>> {
impl_as_for_value_ref!(self, Date)

View File

@@ -15,6 +15,7 @@ arc-swap = "1.0"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
bytes = "1.4"
chrono.workspace = true
common-base = { workspace = true }
common-catalog = { workspace = true }
@@ -36,9 +37,11 @@ futures.workspace = true
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" }
lazy_static = "1.4"
log-store = { workspace = true }
memcomparable = "0.2"
metrics.workspace = true
object-store = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -19,6 +19,7 @@ use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use prost::{DecodeError, EncodeError};
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
@@ -260,6 +261,47 @@ pub enum Error {
// Shared error for each writer in the write group.
#[snafu(display("Failed to write region, source: {}", source))]
WriteGroup { source: Arc<Error> },
#[snafu(display(
"Row length mismatch, expect: {}, actual: {}, location: {}",
expect,
actual,
location
))]
RowLengthMismatch {
expect: usize,
actual: usize,
location: Location,
},
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch { source: datatypes::error::Error },
#[snafu(display("Failed to serialize field, location: {}", location))]
SerializeField {
source: memcomparable::Error,
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization, location: {}",
data_type,
location
))]
NotSupportedField {
data_type: ConcreteDataType,
location: Location,
},
#[snafu(display(
"Failed to deserialize field, source: {} location: {}",
source,
location
))]
DeserializeField {
source: memcomparable::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -304,6 +346,11 @@ impl ErrorExt for Error {
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
RowLengthMismatch { .. } => StatusCode::InvalidArguments,
FieldTypeMismatch { source, .. } => source.status_code(),
SerializeField { .. } => StatusCode::Internal,
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
}
}

View File

@@ -38,6 +38,8 @@ mod region;
#[allow(dead_code)]
pub mod request;
#[allow(dead_code)]
mod row_converter;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
#[allow(dead_code)]

View File

@@ -0,0 +1,378 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Buf;
use common_base::bytes::Bytes;
use common_time::time::Time;
use common_time::{Date, Interval};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Value;
use datatypes::value::ValueRef;
use memcomparable::{Deserializer, Serializer};
use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use crate::error;
use crate::error::{
FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, RowLengthMismatchSnafu,
SerializeFieldSnafu,
};
/// Row value encoder/decoder.
pub trait RowCodec {
/// Encodes rows to bytes.
fn encode<'a, I>(&self, rows: I) -> Result<Vec<u8>>
where
I: Iterator<Item = &'a [ValueRef<'a>]>;
/// Decode row values from bytes.
fn decode(&self, bytes: &[u8]) -> Result<Vec<Vec<Value>>>;
}
pub struct SortField {
data_type: ConcreteDataType,
}
impl SortField {
pub fn new(data_type: ConcreteDataType) -> Self {
Self { data_type }
}
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_) => 11,
ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 10,
ConcreteDataType::Time(_) => 10,
ConcreteDataType::Interval(_) => 18,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
impl SortField {
fn serialize(&self, serializer: &mut Serializer<&mut Vec<u8>>, value: &ValueRef) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
paste!{
value
.[<as_ $f>]()
.context(FieldTypeMismatchSnafu)?
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
}
)*
ConcreteDataType::Timestamp(_) => {
let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
timestamp
.map(|t|t.value())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::List(_) |
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
return error::NotSupportedFieldSnafu {
data_type: $self.data_type.clone()
}.fail()
}
}
};
}
cast_value_and_serialize!(self; serializer;
Boolean, boolean,
Binary, binary,
Int8, i8,
UInt8, u8,
Int16, i16,
UInt16, u16,
Int32, i32,
UInt32, u32,
Int64, i64,
UInt64, u64,
Float32, f32,
Float64, f64,
String, string,
Date, date,
DateTime, datetime,
Time, time,
Interval, interval
);
Ok(())
}
fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
use common_time::DateTime;
macro_rules! deserialize_and_build_value {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
}
)*
ConcreteDataType::Binary(_) => Ok(Value::from(
Option::<Vec<u8>>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(Bytes::from),
)),
ConcreteDataType::Timestamp(ty) => {
let timestamp = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(|t|ty.create_timestamp(t));
Ok(Value::from(timestamp))
}
ConcreteDataType::List(l) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::List(l.clone()),
}
.fail(),
ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Dictionary(d.clone()),
}
.fail(),
ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Null(n.clone()),
}
.fail(),
}
};
}
deserialize_and_build_value!(self; deserializer;
Boolean, bool,
Int8, i8,
Int16, i16,
Int32, i32,
Int64, i64,
UInt8, u8,
UInt16, u16,
UInt32, u32,
UInt64, u64,
Float32, f32,
Float64, f64,
String, String,
Date, Date,
Time, Time,
DateTime, DateTime,
Interval, Interval
)
}
}
/// A memory-comparable row [Value] encoder/decoder.
pub struct McmpRowCodec {
fields: Vec<SortField>,
}
impl McmpRowCodec {
pub fn new(fields: Vec<SortField>) -> Self {
Self { fields }
}
/// Estimated length for encoded bytes.
fn estimated_length(&self) -> usize {
self.fields.iter().map(|f| f.estimated_size()).sum()
}
}
impl RowCodec for McmpRowCodec {
fn encode<'a, I>(&self, rows: I) -> Result<Vec<u8>>
where
I: Iterator<Item = &'a [ValueRef<'a>]>,
{
let mut bytes = Vec::with_capacity(self.estimated_length());
let mut serializer = memcomparable::Serializer::new(&mut bytes);
for row in rows {
ensure!(
row.len() == self.fields.len(),
RowLengthMismatchSnafu {
expect: self.fields.len(),
actual: row.len(),
}
);
for (value, field) in row.iter().zip(self.fields.iter()) {
field.serialize(&mut serializer, value)?;
}
}
Ok(bytes)
}
fn decode(&self, bytes: &[u8]) -> Result<Vec<Vec<Value>>> {
let mut deserializer = memcomparable::Deserializer::new(bytes);
let mut res = vec![];
while deserializer.has_remaining() {
let mut values = Vec::with_capacity(self.fields.len());
for f in &self.fields {
let value = f.deserialize(&mut deserializer)?;
values.push(value);
}
res.push(values);
}
Ok(res)
}
}
#[cfg(test)]
mod tests {
use common_base::bytes::StringBytes;
use common_time::Timestamp;
use datatypes::value::Value;
use super::*;
fn check_encode_and_decode(data_types: &[ConcreteDataType], rows: &[Vec<Value>]) {
let encoder = McmpRowCodec::new(
data_types
.iter()
.map(|t| SortField::new(t.clone()))
.collect::<Vec<_>>(),
);
let value_ref = rows
.iter()
.map(|row| row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>())
.collect::<Vec<_>>();
let result = encoder
.encode(value_ref.iter().map(|r| r.as_slice()))
.unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(value_ref.len(), decoded.len());
for (i, row) in rows.iter().enumerate() {
assert_eq!(row, decoded.get(i).unwrap() as &[Value]);
}
}
#[test]
fn test_memcmp() {
let encoder = McmpRowCodec::new(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
]);
let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(std::iter::once(&value_ref as _)).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(1, decoded.len());
assert_eq!(&values, decoded.get(0).unwrap() as &[Value]);
}
#[test]
fn test_memcmp_timestamp() {
check_encode_and_decode(
&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::int64_datatype(),
],
&[vec![
Value::Timestamp(Timestamp::new_millisecond(42)),
Value::Int64(43),
]],
);
}
#[test]
fn test_memcmp_binary() {
check_encode_and_decode(
&[
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
],
&[vec![
Value::Binary(Bytes::from("hello".as_bytes())),
Value::Int64(43),
]],
);
}
#[test]
fn test_memcmp_string() {
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
&[
vec![Value::String(StringBytes::from("hello"))],
vec![Value::Null],
vec![Value::String("".into())],
vec![Value::String("world".into())],
],
);
}
#[test]
fn test_encode_null() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
],
&[vec![Value::String(StringBytes::from("abcd")), Value::Null]],
)
}
#[test]
fn test_encode_multiple_rows() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
&[
vec![
Value::String("hello".into()),
Value::Int64(42),
Value::Boolean(false),
],
vec![
Value::String("world".into()),
Value::Int64(43),
Value::Boolean(true),
],
vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
],
);
}
}