mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
refactor: KeyValues return ValueRef (#2170)
* refactor: KeyValues return ValueRef * 1. Change KeyValues returned value from pb value to ValueRef 2. Replace OpType/SemanticType with pb's OpType and SemanticType to avoid duplicated conversions. * feat: define min value of OpType as a const * fix: toml format
This commit is contained in:
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -211,7 +211,7 @@ dependencies = [
|
||||
"common-error",
|
||||
"common-time",
|
||||
"datatypes",
|
||||
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)",
|
||||
"greptime-proto",
|
||||
"prost",
|
||||
"snafu",
|
||||
"tonic 0.9.2",
|
||||
@@ -4134,19 +4134,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032#10c349c033dded29097d0dc933fbc2f89f658032"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tonic 0.9.2",
|
||||
"tonic-build",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098#940694cfd05c1e93c1dd7aab486184c9e2853098"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b68af55c050a010f202fcccb22d58f080f0a868#9b68af55c050a010f202fcccb22d58f080f0a868"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"serde",
|
||||
@@ -5527,6 +5515,7 @@ name = "mito2"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"api",
|
||||
"aquamarine",
|
||||
"arc-swap",
|
||||
"async-compat",
|
||||
@@ -5551,7 +5540,6 @@ dependencies = [
|
||||
"datafusion-common",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)",
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"memcomparable",
|
||||
@@ -7040,7 +7028,7 @@ dependencies = [
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)",
|
||||
"greptime-proto",
|
||||
"promql-parser",
|
||||
"prost",
|
||||
"query",
|
||||
@@ -7322,7 +7310,7 @@ dependencies = [
|
||||
"format_num",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)",
|
||||
"greptime-proto",
|
||||
"humantime",
|
||||
"metrics",
|
||||
"num",
|
||||
@@ -9474,6 +9462,7 @@ dependencies = [
|
||||
name = "storage"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -9525,6 +9514,7 @@ dependencies = [
|
||||
name = "store-api"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
|
||||
@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
|
||||
derive_builder = "0.12"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "940694cfd05c1e93c1dd7aab486184c9e2853098" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b68af55c050a010f202fcccb22d58f080f0a868" }
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
once_cell = "1.18"
|
||||
|
||||
@@ -14,16 +14,19 @@
|
||||
|
||||
use common_base::BitVec;
|
||||
use common_time::interval::IntervalUnit;
|
||||
use common_time::time::Time;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Interval;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use common_time::{Date, DateTime, Interval, Timestamp};
|
||||
use datatypes::prelude::{ConcreteDataType, ValueRef};
|
||||
use datatypes::types::{IntervalType, TimeType, TimestampType};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::value::{OrderedF32, OrderedF64, Value};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use greptime_proto::v1;
|
||||
use greptime_proto::v1::ddl_request::Expr;
|
||||
use greptime_proto::v1::greptime_request::Request;
|
||||
use greptime_proto::v1::query_request::Query;
|
||||
use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest};
|
||||
use greptime_proto::v1::value::ValueData;
|
||||
use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -298,6 +301,8 @@ pub fn request_type(request: &Request) -> &'static str {
|
||||
Request::Query(query_req) => query_request_type(query_req),
|
||||
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
|
||||
Request::Delete(_) => "delete",
|
||||
Request::RowInserts(_) => "row_inserts",
|
||||
Request::RowDelete(_) => "row_delete",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,6 +341,244 @@ pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pb_value_to_value_ref(value: &greptime_proto::v1::Value) -> ValueRef {
|
||||
let Some(value) = &value.value_data else {
|
||||
return ValueRef::Null;
|
||||
};
|
||||
|
||||
match value {
|
||||
greptime_proto::v1::value::ValueData::I8Value(v) => ValueRef::Int8(*v as i8),
|
||||
greptime_proto::v1::value::ValueData::I16Value(v) => ValueRef::Int16(*v as i16),
|
||||
greptime_proto::v1::value::ValueData::I32Value(v) => ValueRef::Int32(*v),
|
||||
greptime_proto::v1::value::ValueData::I64Value(v) => ValueRef::Int64(*v),
|
||||
greptime_proto::v1::value::ValueData::U8Value(v) => ValueRef::UInt8(*v as u8),
|
||||
greptime_proto::v1::value::ValueData::U16Value(v) => ValueRef::UInt16(*v as u16),
|
||||
greptime_proto::v1::value::ValueData::U32Value(v) => ValueRef::UInt32(*v),
|
||||
greptime_proto::v1::value::ValueData::U64Value(v) => ValueRef::UInt64(*v),
|
||||
greptime_proto::v1::value::ValueData::F32Value(f) => {
|
||||
ValueRef::Float32(OrderedF32::from(*f))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::F64Value(f) => {
|
||||
ValueRef::Float64(OrderedF64::from(*f))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::BoolValue(b) => ValueRef::Boolean(*b),
|
||||
greptime_proto::v1::value::ValueData::BinaryValue(bytes) => {
|
||||
ValueRef::Binary(bytes.as_slice())
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::StringValue(string) => {
|
||||
ValueRef::String(string.as_str())
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
|
||||
greptime_proto::v1::value::ValueData::DatetimeValue(d) => {
|
||||
ValueRef::DateTime(DateTime::new(*d))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TsSecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_second(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TsMillisecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_millisecond(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TsMicrosecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_microsecond(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TsNanosecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_nanosecond(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TimeSecondValue(t) => {
|
||||
ValueRef::Time(Time::new_second(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TimeMillisecondValue(t) => {
|
||||
ValueRef::Time(Time::new_millisecond(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TimeMicrosecondValue(t) => {
|
||||
ValueRef::Time(Time::new_microsecond(*t))
|
||||
}
|
||||
greptime_proto::v1::value::ValueData::TimeNanosecondValue(t) => {
|
||||
ValueRef::Time(Time::new_nanosecond(*t))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the pb semantic type is valid.
|
||||
pub fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool {
|
||||
type_value == semantic_type as i32
|
||||
}
|
||||
|
||||
/// Returns true if the pb type value is valid.
|
||||
pub fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool {
|
||||
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
is_column_type_eq(column_type, expect_type)
|
||||
}
|
||||
|
||||
/// Convert value into proto's value.
|
||||
pub fn to_proto_value(value: Value) -> Option<v1::Value> {
|
||||
let proto_value = match value {
|
||||
Value::Null => v1::Value { value_data: None },
|
||||
Value::Boolean(v) => v1::Value {
|
||||
value_data: Some(ValueData::BoolValue(v)),
|
||||
},
|
||||
Value::UInt8(v) => v1::Value {
|
||||
value_data: Some(ValueData::U8Value(v.into())),
|
||||
},
|
||||
Value::UInt16(v) => v1::Value {
|
||||
value_data: Some(ValueData::U16Value(v.into())),
|
||||
},
|
||||
Value::UInt32(v) => v1::Value {
|
||||
value_data: Some(ValueData::U32Value(v)),
|
||||
},
|
||||
Value::UInt64(v) => v1::Value {
|
||||
value_data: Some(ValueData::U64Value(v)),
|
||||
},
|
||||
Value::Int8(v) => v1::Value {
|
||||
value_data: Some(ValueData::I8Value(v.into())),
|
||||
},
|
||||
Value::Int16(v) => v1::Value {
|
||||
value_data: Some(ValueData::I16Value(v.into())),
|
||||
},
|
||||
Value::Int32(v) => v1::Value {
|
||||
value_data: Some(ValueData::I32Value(v)),
|
||||
},
|
||||
Value::Int64(v) => v1::Value {
|
||||
value_data: Some(ValueData::I64Value(v)),
|
||||
},
|
||||
Value::Float32(v) => v1::Value {
|
||||
value_data: Some(ValueData::F32Value(*v)),
|
||||
},
|
||||
Value::Float64(v) => v1::Value {
|
||||
value_data: Some(ValueData::F64Value(*v)),
|
||||
},
|
||||
Value::String(v) => v1::Value {
|
||||
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
|
||||
},
|
||||
Value::Binary(v) => v1::Value {
|
||||
value_data: Some(ValueData::BinaryValue(v.to_vec())),
|
||||
},
|
||||
Value::Date(v) => v1::Value {
|
||||
value_data: Some(ValueData::DateValue(v.val())),
|
||||
},
|
||||
Value::DateTime(v) => v1::Value {
|
||||
value_data: Some(ValueData::DatetimeValue(v.val())),
|
||||
},
|
||||
Value::Timestamp(v) => match v.unit() {
|
||||
TimeUnit::Second => v1::Value {
|
||||
value_data: Some(ValueData::TsSecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Millisecond => v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Microsecond => v1::Value {
|
||||
value_data: Some(ValueData::TsMicrosecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Nanosecond => v1::Value {
|
||||
value_data: Some(ValueData::TsNanosecondValue(v.value())),
|
||||
},
|
||||
},
|
||||
Value::Time(v) => match v.unit() {
|
||||
TimeUnit::Second => v1::Value {
|
||||
value_data: Some(v1::value::ValueData::TimeSecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Millisecond => v1::Value {
|
||||
value_data: Some(v1::value::ValueData::TimeMillisecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Microsecond => v1::Value {
|
||||
value_data: Some(v1::value::ValueData::TimeMicrosecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Nanosecond => v1::Value {
|
||||
value_data: Some(v1::value::ValueData::TimeNanosecondValue(v.value())),
|
||||
},
|
||||
},
|
||||
Value::Interval(_) | Value::List(_) => return None,
|
||||
};
|
||||
|
||||
Some(proto_value)
|
||||
}
|
||||
|
||||
/// Returns the [ColumnDataType] of the value.
|
||||
///
|
||||
/// If value is null, returns `None`.
|
||||
pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
|
||||
let value_data = value.value_data.as_ref()?;
|
||||
let value_type = match value_data {
|
||||
ValueData::I8Value(_) => ColumnDataType::Int8,
|
||||
ValueData::I16Value(_) => ColumnDataType::Int16,
|
||||
ValueData::I32Value(_) => ColumnDataType::Int32,
|
||||
ValueData::I64Value(_) => ColumnDataType::Int64,
|
||||
ValueData::U8Value(_) => ColumnDataType::Uint8,
|
||||
ValueData::U16Value(_) => ColumnDataType::Uint16,
|
||||
ValueData::U32Value(_) => ColumnDataType::Uint32,
|
||||
ValueData::U64Value(_) => ColumnDataType::Uint64,
|
||||
ValueData::F32Value(_) => ColumnDataType::Float32,
|
||||
ValueData::F64Value(_) => ColumnDataType::Float64,
|
||||
ValueData::BoolValue(_) => ColumnDataType::Boolean,
|
||||
ValueData::BinaryValue(_) => ColumnDataType::Binary,
|
||||
ValueData::StringValue(_) => ColumnDataType::String,
|
||||
ValueData::DateValue(_) => ColumnDataType::Date,
|
||||
ValueData::DatetimeValue(_) => ColumnDataType::Datetime,
|
||||
ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond,
|
||||
ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
|
||||
ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
|
||||
ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
|
||||
ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond,
|
||||
ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
|
||||
ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
|
||||
ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
|
||||
};
|
||||
Some(value_type)
|
||||
}
|
||||
|
||||
/// Convert [ConcreteDataType] to [ColumnDataType].
|
||||
pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
|
||||
let column_data_type = match data_type {
|
||||
ConcreteDataType::Boolean(_) => ColumnDataType::Boolean,
|
||||
ConcreteDataType::Int8(_) => ColumnDataType::Int8,
|
||||
ConcreteDataType::Int16(_) => ColumnDataType::Int16,
|
||||
ConcreteDataType::Int32(_) => ColumnDataType::Int32,
|
||||
ConcreteDataType::Int64(_) => ColumnDataType::Int64,
|
||||
ConcreteDataType::UInt8(_) => ColumnDataType::Uint8,
|
||||
ConcreteDataType::UInt16(_) => ColumnDataType::Uint16,
|
||||
ConcreteDataType::UInt32(_) => ColumnDataType::Uint32,
|
||||
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
|
||||
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
|
||||
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
|
||||
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
|
||||
ConcreteDataType::String(_) => ColumnDataType::String,
|
||||
ConcreteDataType::Date(_) => ColumnDataType::Date,
|
||||
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
|
||||
ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond,
|
||||
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
|
||||
ColumnDataType::TimestampMillisecond
|
||||
}
|
||||
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
|
||||
ColumnDataType::TimestampMicrosecond
|
||||
}
|
||||
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
|
||||
ColumnDataType::TimestampNanosecond
|
||||
}
|
||||
ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond,
|
||||
ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond,
|
||||
ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond,
|
||||
ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond,
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::Interval(_)
|
||||
| ConcreteDataType::List(_)
|
||||
| ConcreteDataType::Dictionary(_) => return None,
|
||||
};
|
||||
|
||||
Some(column_data_type)
|
||||
}
|
||||
|
||||
/// Returns true if the column type is equal to expected type.
|
||||
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
|
||||
if let Some(expect) = to_column_data_type(expect_type) {
|
||||
column_type == expect
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -221,6 +221,8 @@ impl GrpcQueryHandler for Instance {
|
||||
self.handle_query(query, ctx).await
|
||||
}
|
||||
Request::Ddl(request) => self.handle_ddl(request, ctx).await,
|
||||
Request::RowInserts(_) => unreachable!(),
|
||||
Request::RowDelete(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -713,6 +713,8 @@ impl GrpcQueryHandler for DistInstance {
|
||||
}
|
||||
}
|
||||
}
|
||||
Request::RowInserts(_) => unreachable!(),
|
||||
Request::RowDelete(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +88,8 @@ impl GrpcQueryHandler for Instance {
|
||||
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
|
||||
.await?
|
||||
}
|
||||
Request::RowInserts(_) => unreachable!(),
|
||||
Request::RowDelete(_) => unreachable!(),
|
||||
};
|
||||
|
||||
let output = interceptor.post_execute(output, ctx)?;
|
||||
|
||||
@@ -10,6 +10,7 @@ test = ["common-test-util"]
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
api.workspace = true
|
||||
aquamarine = "0.3"
|
||||
arc-swap = "1.0"
|
||||
async-compat = "0.2"
|
||||
@@ -33,8 +34,6 @@ datafusion-common.workspace = true
|
||||
datafusion.workspace = true
|
||||
datatypes = { workspace = true }
|
||||
futures.workspace = true
|
||||
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" }
|
||||
lazy_static = "1.4"
|
||||
log-store = { workspace = true }
|
||||
memcomparable = "0.2"
|
||||
|
||||
@@ -31,7 +31,6 @@ pub mod manifest;
|
||||
pub mod memtable;
|
||||
#[allow(dead_code)]
|
||||
pub mod metadata;
|
||||
pub(crate) mod proto_util;
|
||||
pub mod read;
|
||||
#[allow(dead_code)]
|
||||
mod region;
|
||||
|
||||
@@ -447,6 +447,7 @@ impl RegionManifestManagerInner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use api::v1::SemanticType;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
@@ -454,7 +455,7 @@ mod test {
|
||||
use super::*;
|
||||
use crate::manifest::action::RegionChange;
|
||||
use crate::manifest::tests::utils::basic_region_metadata;
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder, SemanticType};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -12,11 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
|
||||
/// Build a basic region metadata for testing.
|
||||
/// It contains three columns:
|
||||
|
||||
@@ -14,11 +14,11 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use greptime_proto::v1::mito::{Mutation, OpType};
|
||||
use greptime_proto::v1::{Row, Rows, Value};
|
||||
use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
|
||||
use datatypes::value::ValueRef;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::metadata::{RegionMetadata, SemanticType};
|
||||
use crate::metadata::RegionMetadata;
|
||||
|
||||
/// Key value view of a mutation.
|
||||
#[derive(Debug)]
|
||||
@@ -80,24 +80,24 @@ pub struct KeyValue<'a> {
|
||||
|
||||
impl<'a> KeyValue<'a> {
|
||||
/// Get primary key columns.
|
||||
pub fn primary_keys(&self) -> impl Iterator<Item = &Value> {
|
||||
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
|
||||
self.helper.indices[..self.helper.num_primary_key_column]
|
||||
.iter()
|
||||
.map(|idx| &self.row.values[*idx])
|
||||
.map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx]))
|
||||
}
|
||||
|
||||
/// Get field columns.
|
||||
pub fn fields(&self) -> impl Iterator<Item = &Value> {
|
||||
pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
|
||||
self.helper.indices[self.helper.num_primary_key_column + 1..]
|
||||
.iter()
|
||||
.map(|idx| &self.row.values[*idx])
|
||||
.map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx]))
|
||||
}
|
||||
|
||||
/// Get timestamp.
|
||||
pub fn timestamp(&self) -> Value {
|
||||
pub fn timestamp(&self) -> ValueRef {
|
||||
// Timestamp is primitive, we clone it.
|
||||
let index = self.helper.indices[self.helper.num_primary_key_column];
|
||||
self.row.values[index].clone()
|
||||
api::helper::pb_value_to_value_ref(&self.row.values[index])
|
||||
}
|
||||
|
||||
/// Get number of primary key columns.
|
||||
@@ -187,15 +187,15 @@ impl ReadRowHelper {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1;
|
||||
use api::v1::ColumnDataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use greptime_proto::v1;
|
||||
use greptime_proto::v1::ColumnDataType;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use crate::proto_util::i64_value;
|
||||
use crate::test_util::i64_value;
|
||||
|
||||
const TS_NAME: &str = "ts";
|
||||
const START_SEQ: SequenceNumber = 100;
|
||||
@@ -294,7 +294,7 @@ mod tests {
|
||||
fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
|
||||
assert_eq!(num_rows, kvs.num_rows());
|
||||
let mut expect_seq = START_SEQ;
|
||||
let expect_ts = i64_value(ts);
|
||||
let expect_ts = ValueRef::Int64(ts);
|
||||
for kv in kvs.iter() {
|
||||
assert_eq!(expect_seq, kv.sequence());
|
||||
expect_seq += 1;
|
||||
@@ -303,11 +303,11 @@ mod tests {
|
||||
assert_eq!(values.len(), kv.num_fields());
|
||||
|
||||
assert_eq!(expect_ts, kv.timestamp());
|
||||
let expect_keys: Vec<_> = keys.iter().map(|k| i64_value(*k)).collect();
|
||||
let actual_keys: Vec<_> = kv.primary_keys().cloned().collect();
|
||||
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect();
|
||||
let actual_keys: Vec<_> = kv.primary_keys().collect();
|
||||
assert_eq!(expect_keys, actual_keys);
|
||||
let expect_values: Vec<_> = values.iter().map(|v| i64_value(*v)).collect();
|
||||
let actual_values: Vec<_> = kv.fields().cloned().collect();
|
||||
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect();
|
||||
let actual_values: Vec<_> = kv.fields().collect();
|
||||
assert_eq!(expect_values, actual_values);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use serde::de::Error;
|
||||
@@ -342,17 +343,6 @@ impl ColumnMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// The semantic type of one column
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum SemanticType {
|
||||
/// Tag column, also is a part of primary key.
|
||||
Tag = 0,
|
||||
/// A column that isn't a time index or part of primary key.
|
||||
Field = 1,
|
||||
/// Time index column.
|
||||
Timestamp = 2,
|
||||
}
|
||||
|
||||
/// Fields skipped in serialization.
|
||||
struct SkippedFields {
|
||||
/// Last schema.
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities to process protobuf messages.
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::{TimeType, TimestampType};
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1::{self, ColumnDataType};
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use crate::metadata::SemanticType;
|
||||
|
||||
/// Returns true if the pb semantic type is valid.
|
||||
pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool {
|
||||
type_value == semantic_type as i32
|
||||
}
|
||||
|
||||
/// Returns true if the pb type value is valid.
|
||||
pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool {
|
||||
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
is_column_type_eq(column_type, expect_type)
|
||||
}
|
||||
|
||||
/// Convert value into proto's value.
|
||||
pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
|
||||
let proto_value = match value {
|
||||
Value::Null => v1::Value { value: None },
|
||||
Value::Boolean(v) => v1::Value {
|
||||
value: Some(v1::value::Value::BoolValue(v)),
|
||||
},
|
||||
Value::UInt8(v) => v1::Value {
|
||||
value: Some(v1::value::Value::U8Value(v.into())),
|
||||
},
|
||||
Value::UInt16(v) => v1::Value {
|
||||
value: Some(v1::value::Value::U16Value(v.into())),
|
||||
},
|
||||
Value::UInt32(v) => v1::Value {
|
||||
value: Some(v1::value::Value::U32Value(v)),
|
||||
},
|
||||
Value::UInt64(v) => v1::Value {
|
||||
value: Some(v1::value::Value::U64Value(v)),
|
||||
},
|
||||
Value::Int8(v) => v1::Value {
|
||||
value: Some(v1::value::Value::I8Value(v.into())),
|
||||
},
|
||||
Value::Int16(v) => v1::Value {
|
||||
value: Some(v1::value::Value::I16Value(v.into())),
|
||||
},
|
||||
Value::Int32(v) => v1::Value {
|
||||
value: Some(v1::value::Value::I32Value(v)),
|
||||
},
|
||||
Value::Int64(v) => v1::Value {
|
||||
value: Some(v1::value::Value::I64Value(v)),
|
||||
},
|
||||
Value::Float32(v) => v1::Value {
|
||||
value: Some(v1::value::Value::F32Value(*v)),
|
||||
},
|
||||
Value::Float64(v) => v1::Value {
|
||||
value: Some(v1::value::Value::F64Value(*v)),
|
||||
},
|
||||
Value::String(v) => v1::Value {
|
||||
value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())),
|
||||
},
|
||||
Value::Binary(v) => v1::Value {
|
||||
value: Some(v1::value::Value::BinaryValue(v.to_vec())),
|
||||
},
|
||||
Value::Date(v) => v1::Value {
|
||||
value: Some(v1::value::Value::DateValue(v.val())),
|
||||
},
|
||||
Value::DateTime(v) => v1::Value {
|
||||
value: Some(v1::value::Value::DatetimeValue(v.val())),
|
||||
},
|
||||
Value::Timestamp(v) => match v.unit() {
|
||||
TimeUnit::Second => v1::Value {
|
||||
value: Some(v1::value::Value::TsSecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Millisecond => v1::Value {
|
||||
value: Some(v1::value::Value::TsMillisecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Microsecond => v1::Value {
|
||||
value: Some(v1::value::Value::TsMicrosecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Nanosecond => v1::Value {
|
||||
value: Some(v1::value::Value::TsNanosecondValue(v.value())),
|
||||
},
|
||||
},
|
||||
Value::Time(v) => match v.unit() {
|
||||
TimeUnit::Second => v1::Value {
|
||||
value: Some(v1::value::Value::TimeSecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Millisecond => v1::Value {
|
||||
value: Some(v1::value::Value::TimeMillisecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Microsecond => v1::Value {
|
||||
value: Some(v1::value::Value::TimeMicrosecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Nanosecond => v1::Value {
|
||||
value: Some(v1::value::Value::TimeNanosecondValue(v.value())),
|
||||
},
|
||||
},
|
||||
Value::Interval(_) | Value::List(_) => return None,
|
||||
};
|
||||
|
||||
Some(proto_value)
|
||||
}
|
||||
|
||||
/// Returns the [ColumnDataType] of the value.
|
||||
///
|
||||
/// If value is null, returns `None`.
|
||||
pub(crate) fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
|
||||
let value_data = value.value.as_ref()?;
|
||||
let value_type = match value_data {
|
||||
v1::value::Value::I8Value(_) => ColumnDataType::Int8,
|
||||
v1::value::Value::I16Value(_) => ColumnDataType::Int16,
|
||||
v1::value::Value::I32Value(_) => ColumnDataType::Int32,
|
||||
v1::value::Value::I64Value(_) => ColumnDataType::Int64,
|
||||
v1::value::Value::U8Value(_) => ColumnDataType::Uint8,
|
||||
v1::value::Value::U16Value(_) => ColumnDataType::Uint16,
|
||||
v1::value::Value::U32Value(_) => ColumnDataType::Uint32,
|
||||
v1::value::Value::U64Value(_) => ColumnDataType::Uint64,
|
||||
v1::value::Value::F32Value(_) => ColumnDataType::Float32,
|
||||
v1::value::Value::F64Value(_) => ColumnDataType::Float64,
|
||||
v1::value::Value::BoolValue(_) => ColumnDataType::Boolean,
|
||||
v1::value::Value::BinaryValue(_) => ColumnDataType::Binary,
|
||||
v1::value::Value::StringValue(_) => ColumnDataType::String,
|
||||
v1::value::Value::DateValue(_) => ColumnDataType::Date,
|
||||
v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime,
|
||||
v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond,
|
||||
v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
|
||||
v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
|
||||
v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
|
||||
v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond,
|
||||
v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
|
||||
v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
|
||||
v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
|
||||
};
|
||||
Some(value_type)
|
||||
}
|
||||
|
||||
// TODO(yingwen): Support conversion in greptime-proto.
|
||||
/// Creates value for i64.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn i64_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value: Some(v1::value::Value::I64Value(data)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for timestamp millis.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value: Some(v1::value::Value::TsMillisecondValue(data)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert [ConcreteDataType] to [ColumnDataType].
|
||||
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
|
||||
let column_data_type = match data_type {
|
||||
ConcreteDataType::Boolean(_) => ColumnDataType::Boolean,
|
||||
ConcreteDataType::Int8(_) => ColumnDataType::Int8,
|
||||
ConcreteDataType::Int16(_) => ColumnDataType::Int16,
|
||||
ConcreteDataType::Int32(_) => ColumnDataType::Int32,
|
||||
ConcreteDataType::Int64(_) => ColumnDataType::Int64,
|
||||
ConcreteDataType::UInt8(_) => ColumnDataType::Uint8,
|
||||
ConcreteDataType::UInt16(_) => ColumnDataType::Uint16,
|
||||
ConcreteDataType::UInt32(_) => ColumnDataType::Uint32,
|
||||
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
|
||||
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
|
||||
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
|
||||
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
|
||||
ConcreteDataType::String(_) => ColumnDataType::String,
|
||||
ConcreteDataType::Date(_) => ColumnDataType::Date,
|
||||
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
|
||||
ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond,
|
||||
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
|
||||
ColumnDataType::TimestampMillisecond
|
||||
}
|
||||
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
|
||||
ColumnDataType::TimestampMicrosecond
|
||||
}
|
||||
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
|
||||
ColumnDataType::TimestampNanosecond
|
||||
}
|
||||
ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond,
|
||||
ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond,
|
||||
ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond,
|
||||
ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond,
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::Interval(_)
|
||||
| ConcreteDataType::List(_)
|
||||
| ConcreteDataType::Dictionary(_) => return None,
|
||||
};
|
||||
|
||||
Some(column_data_type)
|
||||
}
|
||||
|
||||
/// Convert semantic type to proto's semantic type
|
||||
pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType {
|
||||
match semantic_type {
|
||||
SemanticType::Tag => v1::SemanticType::Tag,
|
||||
SemanticType::Field => v1::SemanticType::Field,
|
||||
SemanticType::Timestamp => v1::SemanticType::Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert op type to proto's op type.
|
||||
pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType {
|
||||
match op_type {
|
||||
OpType::Delete => v1::mito::OpType::Delete,
|
||||
OpType::Put => v1::mito::OpType::Put,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the column type is equal to expected type.
|
||||
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
|
||||
if let Some(expect) = to_column_data_type(expect_type) {
|
||||
column_type == expect
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Tests.
|
||||
@@ -17,19 +17,19 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::helper::{
|
||||
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
|
||||
to_proto_value,
|
||||
};
|
||||
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
|
||||
use store_api::storage::{ColumnId, CompactionStrategy, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
|
||||
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
|
||||
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadata};
|
||||
use crate::proto_util::{
|
||||
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
|
||||
to_proto_semantic_type, to_proto_value,
|
||||
};
|
||||
|
||||
/// Options that affect the entire region.
|
||||
///
|
||||
@@ -139,7 +139,7 @@ impl WriteRequest {
|
||||
for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
|
||||
validate_proto_value(region_id, value, column_schema)?;
|
||||
|
||||
if value.value.is_none() {
|
||||
if value.value_data.is_none() {
|
||||
has_null[i] = true;
|
||||
}
|
||||
}
|
||||
@@ -202,7 +202,7 @@ impl WriteRequest {
|
||||
"column {} has semantic type {:?}, given: {}({})",
|
||||
column.column_schema.name,
|
||||
column.semantic_type,
|
||||
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type)
|
||||
api::v1::SemanticType::from_i32(input_col.semantic_type)
|
||||
.map(|v| v.as_str_name())
|
||||
.unwrap_or("Unknown"),
|
||||
input_col.semantic_type
|
||||
@@ -282,6 +282,7 @@ impl WriteRequest {
|
||||
})?;
|
||||
|
||||
// Convert default value into proto's value.
|
||||
|
||||
let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
|
||||
region_id,
|
||||
reason: format!(
|
||||
@@ -308,7 +309,7 @@ impl WriteRequest {
|
||||
self.rows.schema.push(ColumnSchema {
|
||||
column_name: column.column_schema.name.clone(),
|
||||
datatype: datatype as i32,
|
||||
semantic_type: to_proto_semantic_type(column.semantic_type) as i32,
|
||||
semantic_type: column.semantic_type as i32,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
@@ -426,13 +427,13 @@ impl RequestBody {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::{Row, SemanticType};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use greptime_proto::v1::{Row, SemanticType};
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::metadata::RegionMetadataBuilder;
|
||||
use crate::proto_util::{i64_value, ts_ms_value};
|
||||
use crate::test_util::{i64_value, ts_ms_value};
|
||||
|
||||
fn new_column_schema(
|
||||
name: &str,
|
||||
@@ -516,7 +517,7 @@ mod tests {
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: crate::metadata::SemanticType::Timestamp,
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
@@ -525,7 +526,7 @@ mod tests {
|
||||
ConcreteDataType::int64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: crate::metadata::SemanticType::Tag,
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 2,
|
||||
})
|
||||
.primary_key(vec![2]);
|
||||
@@ -605,7 +606,7 @@ mod tests {
|
||||
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
|
||||
],
|
||||
rows: vec![Row {
|
||||
values: vec![Value { value: None }, i64_value(2)],
|
||||
values: vec![Value { value_data: None }, i64_value(2)],
|
||||
}],
|
||||
};
|
||||
let metadata = new_region_metadata();
|
||||
@@ -686,7 +687,7 @@ mod tests {
|
||||
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
|
||||
],
|
||||
rows: vec![Row {
|
||||
values: vec![ts_ms_value(1), Value { value: None }],
|
||||
values: vec![ts_ms_value(1), Value { value_data: None }],
|
||||
}],
|
||||
};
|
||||
assert_eq!(expect_rows, request.rows);
|
||||
|
||||
@@ -16,6 +16,9 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::greptime_proto::v1;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::SemanticType;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
@@ -30,7 +33,7 @@ use crate::config::MitoConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::error::Result;
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadataRef};
|
||||
use crate::request::{CreateRequest, RegionOptions};
|
||||
use crate::worker::WorkerGroup;
|
||||
|
||||
@@ -226,3 +229,20 @@ impl CreateRequestBuilder {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Support conversion in greptime-proto.
|
||||
/// Creates value for i64.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn i64_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::I64Value(data)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates value for timestamp millis.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(data)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,11 +17,11 @@
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::WalEntry;
|
||||
use async_stream::try_stream;
|
||||
use common_error::ext::BoxedError;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::StreamExt;
|
||||
use greptime_proto::v1::mito::WalEntry;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::entry::Entry;
|
||||
@@ -157,10 +157,11 @@ impl<S: LogStore> WalWriter<S> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::{
|
||||
value, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType, Value,
|
||||
};
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::mito::{Mutation, OpType};
|
||||
use greptime_proto::v1::{value, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::test_util::log_store_util;
|
||||
use store_api::storage::SequenceNumber;
|
||||
@@ -199,10 +200,10 @@ mod tests {
|
||||
.map(|(str_col, int_col)| {
|
||||
let values = vec![
|
||||
Value {
|
||||
value: Some(value::Value::StringValue(str_col.to_string())),
|
||||
value_data: Some(value::ValueData::StringValue(str_col.to_string())),
|
||||
},
|
||||
Value {
|
||||
value: Some(value::Value::TsMillisecondValue(*int_col)),
|
||||
value_data: Some(value::ValueData::TsMillisecondValue(*int_col)),
|
||||
},
|
||||
];
|
||||
Row { values }
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::collections::{hash_map, HashMap};
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use greptime_proto::v1::mito::{Mutation, WalEntry};
|
||||
use api::v1::{Mutation, WalEntry};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::{RegionId, SequenceNumber};
|
||||
@@ -27,7 +27,6 @@ use tokio::sync::oneshot::Sender;
|
||||
use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu};
|
||||
use crate::memtable::KeyValues;
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::proto_util::to_proto_op_type;
|
||||
use crate::region::version::{VersionControlData, VersionRef};
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::request::{SenderWriteRequest, WriteRequest};
|
||||
@@ -212,7 +211,7 @@ impl RegionWriteCtx {
|
||||
let num_rows = sender_req.request.rows.rows.len() as u64;
|
||||
|
||||
self.wal_entry.mutations.push(Mutation {
|
||||
op_type: to_proto_op_type(sender_req.request.op_type) as i32,
|
||||
op_type: sender_req.request.op_type as i32,
|
||||
sequence: self.next_sequence,
|
||||
rows: Some(sender_req.request.rows),
|
||||
});
|
||||
|
||||
@@ -194,6 +194,8 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
}
|
||||
}
|
||||
Request::Ddl(_) => unimplemented!(),
|
||||
Request::RowInserts(_) => unimplemented!(),
|
||||
Request::RowDelete(_) => unimplemented!(),
|
||||
};
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
arrow-array.workspace = true
|
||||
arrow.workspace = true
|
||||
|
||||
@@ -20,6 +20,7 @@ pub mod util;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use datatypes::prelude::ScalarVectorBuilder;
|
||||
use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::vectors::{
|
||||
@@ -29,7 +30,7 @@ use rand::distributions::Alphanumeric;
|
||||
use rand::prelude::ThreadRng;
|
||||
use rand::Rng;
|
||||
use storage::memtable::KeyValues;
|
||||
use store_api::storage::{OpType, SequenceNumber};
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
static NEXT_SEQUENCE: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ mod tests {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use common_time::Timestamp;
|
||||
@@ -124,7 +125,7 @@ mod tests {
|
||||
};
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::storage::{ChunkReader, OpType, SequenceNumber};
|
||||
use store_api::storage::{ChunkReader, SequenceNumber};
|
||||
|
||||
use super::*;
|
||||
use crate::file_purger::noop::new_noop_file_purger;
|
||||
|
||||
@@ -109,10 +109,10 @@ pub mod noop {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::OpType;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::file_purger::noop::NoopFilePurgeHandler;
|
||||
|
||||
@@ -22,11 +22,12 @@ use std::fmt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use store_api::storage::{consts, OpType, SequenceNumber};
|
||||
use store_api::storage::{consts, SequenceNumber};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::flush::FlushStrategyRef;
|
||||
|
||||
@@ -19,12 +19,13 @@ use std::ops::Bound;
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_time::range::TimestampRange;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder};
|
||||
use store_api::storage::{OpType, SequenceNumber};
|
||||
use store_api::storage::{SequenceNumber, MIN_OP_TYPE};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::flush::FlushStrategyRef;
|
||||
@@ -300,7 +301,7 @@ fn collect_iter<'a, I: Iterator<Item = (&'a InnerKey, &'a RowValue)>>(
|
||||
for (inner_key, row_value) in iter.take(batch_size) {
|
||||
keys.push(inner_key);
|
||||
sequences.push(Some(inner_key.sequence));
|
||||
op_types.push(Some(inner_key.op_type.as_u8()));
|
||||
op_types.push(Some(inner_key.op_type as u8));
|
||||
values.push(row_value);
|
||||
}
|
||||
|
||||
@@ -494,7 +495,7 @@ impl InnerKey {
|
||||
// to zero (Minimum value).
|
||||
self.sequence = 0;
|
||||
self.index_in_batch = 0;
|
||||
self.op_type = OpType::min_type();
|
||||
self.op_type = MIN_OP_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use store_api::storage::{OpType, SequenceNumber};
|
||||
use api::v1::OpType;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use super::MemtableRef;
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -139,7 +139,7 @@ fn check_iter_content(
|
||||
assert_eq!(Value::from(values[index].0), v0);
|
||||
assert_eq!(Value::from(values[index].1), v1);
|
||||
assert_eq!(Value::from(sequences[index]), sequence);
|
||||
assert_eq!(Value::from(op_types[index].as_u8()), op_type);
|
||||
assert_eq!(Value::from(op_types[index] as u8), op_type);
|
||||
|
||||
index += 1;
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#![allow(clippy::all)]
|
||||
tonic::include_proto!("greptime.storage.wal.v1");
|
||||
|
||||
use store_api::storage::OpType;
|
||||
use api::v1::OpType;
|
||||
|
||||
use crate::write_batch::Payload;
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ impl<R: BatchReader> BatchReader for DedupReader<R> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use store_api::storage::OpType;
|
||||
use api::v1::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::read_util;
|
||||
|
||||
@@ -16,12 +16,13 @@ use std::cmp::Ordering;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_base::BitVec;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::schema::{SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::{BooleanVector, UInt8Vector};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::{Chunk, ColumnId, OpType};
|
||||
use store_api::storage::{Chunk, ColumnId};
|
||||
|
||||
use crate::error;
|
||||
use crate::metadata::{self, Result};
|
||||
@@ -351,7 +352,7 @@ impl BatchOp for ProjectedSchema {
|
||||
});
|
||||
|
||||
for (i, op_type) in op_types.iter_data().enumerate() {
|
||||
if op_type == Some(OpType::Delete.as_u8()) {
|
||||
if op_type == Some(OpType::Delete as u8) {
|
||||
selected.set(i, false);
|
||||
}
|
||||
}
|
||||
@@ -360,10 +361,10 @@ impl BatchOp for ProjectedSchema {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::OpType;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::metadata::Error;
|
||||
|
||||
@@ -355,6 +355,7 @@ mod tests {
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use datatypes::arrow::array::{Array, UInt64Array, UInt8Array};
|
||||
@@ -362,7 +363,6 @@ mod tests {
|
||||
use datatypes::types::{TimestampMillisecondType, TimestampType};
|
||||
use datatypes::vectors::TimestampMillisecondVector;
|
||||
use object_store::services::Fs;
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::file_purger::noop::new_noop_file_purger;
|
||||
|
||||
@@ -14,11 +14,11 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use async_trait::async_trait;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::{BatchIterator, BoxedBatchIterator, RowOrdering};
|
||||
@@ -64,7 +64,7 @@ pub fn new_full_kv_batch(all_values: &[(i64, i64, u64, OpType)]) -> Batch {
|
||||
let value = Arc::new(Int64Vector::from_values(all_values.iter().map(|v| v.1)));
|
||||
let sequences = Arc::new(UInt64Vector::from_values(all_values.iter().map(|v| v.2)));
|
||||
let op_types = Arc::new(UInt8Vector::from_values(
|
||||
all_values.iter().map(|v| v.3.as_u8()),
|
||||
all_values.iter().map(|v| v.3 as u8),
|
||||
));
|
||||
|
||||
Batch::new(vec![key, value, sequences, op_types])
|
||||
|
||||
@@ -17,11 +17,12 @@ mod compat;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{OpType, WriteRequest};
|
||||
use store_api::storage::WriteRequest;
|
||||
|
||||
use crate::error::{
|
||||
BatchMissingColumnSnafu, CreateDefaultSnafu, CreateRecordBatchSnafu, Error, HasNullSnafu,
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::arrow::ipc::reader::StreamReader;
|
||||
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
|
||||
use datatypes::schema::Schema;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::error::{
|
||||
|
||||
@@ -5,6 +5,7 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes = "1.1"
|
||||
common-base = { workspace = true }
|
||||
|
||||
@@ -45,4 +45,4 @@ pub use self::requests::{
|
||||
};
|
||||
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
|
||||
pub use self::snapshot::{ReadContext, Snapshot};
|
||||
pub use self::types::{OpType, SequenceNumber};
|
||||
pub use self::types::{SequenceNumber, MIN_OP_TYPE};
|
||||
|
||||
@@ -14,43 +14,11 @@
|
||||
|
||||
//! Common types.
|
||||
|
||||
use api::v1::OpType;
|
||||
|
||||
/// Represents a sequence number of data in storage. The offset of logstore can be used
|
||||
/// as a sequence number.
|
||||
pub type SequenceNumber = u64;
|
||||
|
||||
/// Operation type of the value to write to storage.
|
||||
///
|
||||
/// The enum values are stored in the SST files so don't change
|
||||
/// them if possible.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum OpType {
|
||||
/// Delete operation.
|
||||
Delete = 0,
|
||||
/// Put operation.
|
||||
Put = 1,
|
||||
}
|
||||
|
||||
impl OpType {
|
||||
/// Cast the [OpType] to u8.
|
||||
#[inline]
|
||||
pub fn as_u8(&self) -> u8 {
|
||||
*self as u8
|
||||
}
|
||||
|
||||
/// Minimal op type after casting to u8.
|
||||
pub const fn min_type() -> OpType {
|
||||
OpType::Delete
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_op_type() {
|
||||
assert_eq!(0, OpType::Delete.as_u8());
|
||||
assert_eq!(1, OpType::Put.as_u8());
|
||||
assert_eq!(0, OpType::min_type().as_u8());
|
||||
}
|
||||
}
|
||||
// TODO(hl): We should implement a `min` method for OpType in greptime-proto crate.
|
||||
pub const MIN_OP_TYPE: OpType = OpType::Delete;
|
||||
|
||||
Reference in New Issue
Block a user