diff --git a/Cargo.lock b/Cargo.lock index 66eb7257f1..0a38354dfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,7 +211,7 @@ dependencies = [ "common-error", "common-time", "datatypes", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)", + "greptime-proto", "prost", "snafu", "tonic 0.9.2", @@ -4134,19 +4134,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032#10c349c033dded29097d0dc933fbc2f89f658032" -dependencies = [ - "prost", - "serde", - "serde_json", - "tonic 0.9.2", - "tonic-build", -] - -[[package]] -name = "greptime-proto" -version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098#940694cfd05c1e93c1dd7aab486184c9e2853098" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b68af55c050a010f202fcccb22d58f080f0a868#9b68af55c050a010f202fcccb22d58f080f0a868" dependencies = [ "prost", "serde", @@ -5527,6 +5515,7 @@ name = "mito2" version = "0.3.2" dependencies = [ "anymap", + "api", "aquamarine", "arc-swap", "async-compat", @@ -5551,7 +5540,6 @@ dependencies = [ "datafusion-common", "datatypes", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)", "lazy_static", "log-store", "memcomparable", @@ -7040,7 +7028,7 @@ dependencies = [ "datafusion", "datatypes", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)", + "greptime-proto", "promql-parser", "prost", "query", @@ -7322,7 +7310,7 @@ dependencies = [ "format_num", "futures", "futures-util", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=940694cfd05c1e93c1dd7aab486184c9e2853098)", + "greptime-proto", "humantime", "metrics", "num", @@ -9474,6 +9462,7 @@ dependencies = [ name = "storage" version = "0.3.2" dependencies = [ + "api", "arc-swap", "arrow", "arrow-array", @@ -9525,6 +9514,7 @@ dependencies = [ name = "store-api" version = "0.3.2" dependencies = [ + "api", "async-stream", "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 916ed8fd80..6cc91e9c2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "940694cfd05c1e93c1dd7aab486184c9e2853098" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b68af55c050a010f202fcccb22d58f080f0a868" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 673cfa93ff..98107438a6 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -14,16 +14,19 @@ use common_base::BitVec; use common_time::interval::IntervalUnit; +use common_time::time::Time; use common_time::timestamp::TimeUnit; -use common_time::Interval; -use datatypes::prelude::ConcreteDataType; +use common_time::{Date, DateTime, Interval, Timestamp}; +use datatypes::prelude::{ConcreteDataType, ValueRef}; use datatypes::types::{IntervalType, TimeType, TimestampType}; -use datatypes::value::Value; +use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::VectorRef; +use greptime_proto::v1; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; -use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest}; +use greptime_proto::v1::value::ValueData; +use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType}; use snafu::prelude::*; use crate::error::{self, Result}; @@ -298,6 +301,8 @@ pub fn request_type(request: &Request) -> &'static str { Request::Query(query_req) => query_request_type(query_req), Request::Ddl(ddl_req) => ddl_request_type(ddl_req), Request::Delete(_) => "delete", + Request::RowInserts(_) => "row_inserts", + Request::RowDelete(_) => "row_delete", } } @@ -336,6 +341,244 @@ pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano { } } +pub fn pb_value_to_value_ref(value: &greptime_proto::v1::Value) -> ValueRef { + let Some(value) = &value.value_data else { + return ValueRef::Null; + }; + + match value { + greptime_proto::v1::value::ValueData::I8Value(v) => ValueRef::Int8(*v as i8), + greptime_proto::v1::value::ValueData::I16Value(v) => ValueRef::Int16(*v as i16), + greptime_proto::v1::value::ValueData::I32Value(v) => ValueRef::Int32(*v), + greptime_proto::v1::value::ValueData::I64Value(v) => ValueRef::Int64(*v), + greptime_proto::v1::value::ValueData::U8Value(v) => ValueRef::UInt8(*v as u8), + greptime_proto::v1::value::ValueData::U16Value(v) => ValueRef::UInt16(*v as u16), + greptime_proto::v1::value::ValueData::U32Value(v) => ValueRef::UInt32(*v), + greptime_proto::v1::value::ValueData::U64Value(v) => ValueRef::UInt64(*v), + greptime_proto::v1::value::ValueData::F32Value(f) => { + ValueRef::Float32(OrderedF32::from(*f)) + } + greptime_proto::v1::value::ValueData::F64Value(f) => { + ValueRef::Float64(OrderedF64::from(*f)) + } + greptime_proto::v1::value::ValueData::BoolValue(b) => ValueRef::Boolean(*b), + greptime_proto::v1::value::ValueData::BinaryValue(bytes) => { + ValueRef::Binary(bytes.as_slice()) + } + greptime_proto::v1::value::ValueData::StringValue(string) => { + ValueRef::String(string.as_str()) + } + greptime_proto::v1::value::ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)), + greptime_proto::v1::value::ValueData::DatetimeValue(d) => { + ValueRef::DateTime(DateTime::new(*d)) + } + greptime_proto::v1::value::ValueData::TsSecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_second(*t)) + } + greptime_proto::v1::value::ValueData::TsMillisecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_millisecond(*t)) + } + greptime_proto::v1::value::ValueData::TsMicrosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_microsecond(*t)) + } + greptime_proto::v1::value::ValueData::TsNanosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_nanosecond(*t)) + } + greptime_proto::v1::value::ValueData::TimeSecondValue(t) => { + ValueRef::Time(Time::new_second(*t)) + } + greptime_proto::v1::value::ValueData::TimeMillisecondValue(t) => { + ValueRef::Time(Time::new_millisecond(*t)) + } + greptime_proto::v1::value::ValueData::TimeMicrosecondValue(t) => { + ValueRef::Time(Time::new_microsecond(*t)) + } + greptime_proto::v1::value::ValueData::TimeNanosecondValue(t) => { + ValueRef::Time(Time::new_nanosecond(*t)) + } + } +} + +/// Returns true if the pb semantic type is valid. +pub fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool { + type_value == semantic_type as i32 +} + +/// Returns true if the pb type value is valid. +pub fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { + let Some(column_type) = ColumnDataType::from_i32(type_value) else { + return false; + }; + + is_column_type_eq(column_type, expect_type) +} + +/// Convert value into proto's value. +pub fn to_proto_value(value: Value) -> Option { + let proto_value = match value { + Value::Null => v1::Value { value_data: None }, + Value::Boolean(v) => v1::Value { + value_data: Some(ValueData::BoolValue(v)), + }, + Value::UInt8(v) => v1::Value { + value_data: Some(ValueData::U8Value(v.into())), + }, + Value::UInt16(v) => v1::Value { + value_data: Some(ValueData::U16Value(v.into())), + }, + Value::UInt32(v) => v1::Value { + value_data: Some(ValueData::U32Value(v)), + }, + Value::UInt64(v) => v1::Value { + value_data: Some(ValueData::U64Value(v)), + }, + Value::Int8(v) => v1::Value { + value_data: Some(ValueData::I8Value(v.into())), + }, + Value::Int16(v) => v1::Value { + value_data: Some(ValueData::I16Value(v.into())), + }, + Value::Int32(v) => v1::Value { + value_data: Some(ValueData::I32Value(v)), + }, + Value::Int64(v) => v1::Value { + value_data: Some(ValueData::I64Value(v)), + }, + Value::Float32(v) => v1::Value { + value_data: Some(ValueData::F32Value(*v)), + }, + Value::Float64(v) => v1::Value { + value_data: Some(ValueData::F64Value(*v)), + }, + Value::String(v) => v1::Value { + value_data: Some(ValueData::StringValue(v.as_utf8().to_string())), + }, + Value::Binary(v) => v1::Value { + value_data: Some(ValueData::BinaryValue(v.to_vec())), + }, + Value::Date(v) => v1::Value { + value_data: Some(ValueData::DateValue(v.val())), + }, + Value::DateTime(v) => v1::Value { + value_data: Some(ValueData::DatetimeValue(v.val())), + }, + Value::Timestamp(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value_data: Some(ValueData::TsSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value_data: Some(ValueData::TsMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value_data: Some(ValueData::TsMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value_data: Some(ValueData::TsNanosecondValue(v.value())), + }, + }, + Value::Time(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value_data: Some(v1::value::ValueData::TimeSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value_data: Some(v1::value::ValueData::TimeMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value_data: Some(v1::value::ValueData::TimeMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value_data: Some(v1::value::ValueData::TimeNanosecondValue(v.value())), + }, + }, + Value::Interval(_) | Value::List(_) => return None, + }; + + Some(proto_value) +} + +/// Returns the [ColumnDataType] of the value. +/// +/// If value is null, returns `None`. +pub fn proto_value_type(value: &v1::Value) -> Option { + let value_data = value.value_data.as_ref()?; + let value_type = match value_data { + ValueData::I8Value(_) => ColumnDataType::Int8, + ValueData::I16Value(_) => ColumnDataType::Int16, + ValueData::I32Value(_) => ColumnDataType::Int32, + ValueData::I64Value(_) => ColumnDataType::Int64, + ValueData::U8Value(_) => ColumnDataType::Uint8, + ValueData::U16Value(_) => ColumnDataType::Uint16, + ValueData::U32Value(_) => ColumnDataType::Uint32, + ValueData::U64Value(_) => ColumnDataType::Uint64, + ValueData::F32Value(_) => ColumnDataType::Float32, + ValueData::F64Value(_) => ColumnDataType::Float64, + ValueData::BoolValue(_) => ColumnDataType::Boolean, + ValueData::BinaryValue(_) => ColumnDataType::Binary, + ValueData::StringValue(_) => ColumnDataType::String, + ValueData::DateValue(_) => ColumnDataType::Date, + ValueData::DatetimeValue(_) => ColumnDataType::Datetime, + ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond, + ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond, + ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + }; + Some(value_type) +} + +/// Convert [ConcreteDataType] to [ColumnDataType]. +pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option { + let column_data_type = match data_type { + ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, + ConcreteDataType::Int8(_) => ColumnDataType::Int8, + ConcreteDataType::Int16(_) => ColumnDataType::Int16, + ConcreteDataType::Int32(_) => ColumnDataType::Int32, + ConcreteDataType::Int64(_) => ColumnDataType::Int64, + ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, + ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, + ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, + ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, + ConcreteDataType::Float32(_) => ColumnDataType::Float32, + ConcreteDataType::Float64(_) => ColumnDataType::Float64, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::String(_) => ColumnDataType::String, + ConcreteDataType::Date(_) => ColumnDataType::Date, + ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, + ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + ColumnDataType::TimestampMillisecond + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + ColumnDataType::TimestampMicrosecond + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + ColumnDataType::TimestampNanosecond + } + ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, + ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, + ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, + ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, + ConcreteDataType::Null(_) + | ConcreteDataType::Interval(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => return None, + }; + + Some(column_data_type) +} + +/// Returns true if the column type is equal to expected type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + if let Some(expect) = to_column_data_type(expect_type) { + column_type == expect + } else { + false + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index b3cb0aee76..bb5b75cd95 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -221,6 +221,8 @@ impl GrpcQueryHandler for Instance { self.handle_query(query, ctx).await } Request::Ddl(request) => self.handle_ddl(request, ctx).await, + Request::RowInserts(_) => unreachable!(), + Request::RowDelete(_) => unreachable!(), } } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 004d923415..430c9a936b 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -713,6 +713,8 @@ impl GrpcQueryHandler for DistInstance { } } } + Request::RowInserts(_) => unreachable!(), + Request::RowDelete(_) => unreachable!(), } } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 91dead2260..2f7dc71245 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -88,6 +88,8 @@ impl GrpcQueryHandler for Instance { GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) .await? } + Request::RowInserts(_) => unreachable!(), + Request::RowDelete(_) => unreachable!(), }; let output = interceptor.post_execute(output, ctx)?; diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index dd338f1d68..76233e2057 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -10,6 +10,7 @@ test = ["common-test-util"] [dependencies] anymap = "1.0.0-beta.2" +api.workspace = true aquamarine = "0.3" arc-swap = "1.0" async-compat = "0.2" @@ -33,8 +34,6 @@ datafusion-common.workspace = true datafusion.workspace = true datatypes = { workspace = true } futures.workspace = true -# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged. -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" } lazy_static = "1.4" log-store = { workspace = true } memcomparable = "0.2" diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 4cdaf26a81..10ce713b8a 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -31,7 +31,6 @@ pub mod manifest; pub mod memtable; #[allow(dead_code)] pub mod metadata; -pub(crate) mod proto_util; pub mod read; #[allow(dead_code)] mod region; diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index a9a675252b..ebe57a8e6b 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -447,6 +447,7 @@ impl RegionManifestManagerInner { #[cfg(test)] mod test { + use api::v1::SemanticType; use common_datasource::compression::CompressionType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -454,7 +455,7 @@ mod test { use super::*; use crate::manifest::action::RegionChange; use crate::manifest::tests::utils::basic_region_metadata; - use crate::metadata::{ColumnMetadata, RegionMetadataBuilder, SemanticType}; + use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; use crate::test_util::TestEnv; #[tokio::test] diff --git a/src/mito2/src/manifest/tests/utils.rs b/src/mito2/src/manifest/tests/utils.rs index ae6d7df78e..7444f05119 100644 --- a/src/mito2/src/manifest/tests/utils.rs +++ b/src/mito2/src/manifest/tests/utils.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::SemanticType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::storage::RegionId; -use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType}; +use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; /// Build a basic region metadata for testing. /// It contains three columns: diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index dbd8e12b91..a91fbc4e7f 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -14,11 +14,11 @@ use std::collections::HashMap; -use greptime_proto::v1::mito::{Mutation, OpType}; -use greptime_proto::v1::{Row, Rows, Value}; +use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; +use datatypes::value::ValueRef; use store_api::storage::SequenceNumber; -use crate::metadata::{RegionMetadata, SemanticType}; +use crate::metadata::RegionMetadata; /// Key value view of a mutation. #[derive(Debug)] @@ -80,24 +80,24 @@ pub struct KeyValue<'a> { impl<'a> KeyValue<'a> { /// Get primary key columns. - pub fn primary_keys(&self) -> impl Iterator { + pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] .iter() - .map(|idx| &self.row.values[*idx]) + .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) } /// Get field columns. - pub fn fields(&self) -> impl Iterator { + pub fn fields(&self) -> impl Iterator { self.helper.indices[self.helper.num_primary_key_column + 1..] .iter() - .map(|idx| &self.row.values[*idx]) + .map(|idx| api::helper::pb_value_to_value_ref(&self.row.values[*idx])) } /// Get timestamp. - pub fn timestamp(&self) -> Value { + pub fn timestamp(&self) -> ValueRef { // Timestamp is primitive, we clone it. let index = self.helper.indices[self.helper.num_primary_key_column]; - self.row.values[index].clone() + api::helper::pb_value_to_value_ref(&self.row.values[index]) } /// Get number of primary key columns. @@ -187,15 +187,15 @@ impl ReadRowHelper { #[cfg(test)] mod tests { + use api::v1; + use api::v1::ColumnDataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; - use greptime_proto::v1; - use greptime_proto::v1::ColumnDataType; use store_api::storage::RegionId; use super::*; use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; - use crate::proto_util::i64_value; + use crate::test_util::i64_value; const TS_NAME: &str = "ts"; const START_SEQ: SequenceNumber = 100; @@ -294,7 +294,7 @@ mod tests { fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) { assert_eq!(num_rows, kvs.num_rows()); let mut expect_seq = START_SEQ; - let expect_ts = i64_value(ts); + let expect_ts = ValueRef::Int64(ts); for kv in kvs.iter() { assert_eq!(expect_seq, kv.sequence()); expect_seq += 1; @@ -303,11 +303,11 @@ mod tests { assert_eq!(values.len(), kv.num_fields()); assert_eq!(expect_ts, kv.timestamp()); - let expect_keys: Vec<_> = keys.iter().map(|k| i64_value(*k)).collect(); - let actual_keys: Vec<_> = kv.primary_keys().cloned().collect(); + let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect(); + let actual_keys: Vec<_> = kv.primary_keys().collect(); assert_eq!(expect_keys, actual_keys); - let expect_values: Vec<_> = values.iter().map(|v| i64_value(*v)).collect(); - let actual_values: Vec<_> = kv.fields().cloned().collect(); + let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect(); + let actual_values: Vec<_> = kv.fields().collect(); assert_eq!(expect_values, actual_values); } } diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index cfd29431b3..06d5df3c89 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use api::v1::SemanticType; use datatypes::prelude::DataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use serde::de::Error; @@ -342,17 +343,6 @@ impl ColumnMetadata { } } -/// The semantic type of one column -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] -pub enum SemanticType { - /// Tag column, also is a part of primary key. - Tag = 0, - /// A column that isn't a time index or part of primary key. - Field = 1, - /// Time index column. - Timestamp = 2, -} - /// Fields skipped in serialization. struct SkippedFields { /// Last schema. diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs deleted file mode 100644 index d2de210c83..0000000000 --- a/src/mito2/src/proto_util.rs +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Utilities to process protobuf messages. - -use common_time::timestamp::TimeUnit; -use datatypes::prelude::ConcreteDataType; -use datatypes::types::{TimeType, TimestampType}; -use datatypes::value::Value; -use greptime_proto::v1::{self, ColumnDataType}; -use store_api::storage::OpType; - -use crate::metadata::SemanticType; - -/// Returns true if the pb semantic type is valid. -pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool { - type_value == semantic_type as i32 -} - -/// Returns true if the pb type value is valid. -pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { - let Some(column_type) = ColumnDataType::from_i32(type_value) else { - return false; - }; - - is_column_type_eq(column_type, expect_type) -} - -/// Convert value into proto's value. -pub(crate) fn to_proto_value(value: Value) -> Option { - let proto_value = match value { - Value::Null => v1::Value { value: None }, - Value::Boolean(v) => v1::Value { - value: Some(v1::value::Value::BoolValue(v)), - }, - Value::UInt8(v) => v1::Value { - value: Some(v1::value::Value::U8Value(v.into())), - }, - Value::UInt16(v) => v1::Value { - value: Some(v1::value::Value::U16Value(v.into())), - }, - Value::UInt32(v) => v1::Value { - value: Some(v1::value::Value::U32Value(v)), - }, - Value::UInt64(v) => v1::Value { - value: Some(v1::value::Value::U64Value(v)), - }, - Value::Int8(v) => v1::Value { - value: Some(v1::value::Value::I8Value(v.into())), - }, - Value::Int16(v) => v1::Value { - value: Some(v1::value::Value::I16Value(v.into())), - }, - Value::Int32(v) => v1::Value { - value: Some(v1::value::Value::I32Value(v)), - }, - Value::Int64(v) => v1::Value { - value: Some(v1::value::Value::I64Value(v)), - }, - Value::Float32(v) => v1::Value { - value: Some(v1::value::Value::F32Value(*v)), - }, - Value::Float64(v) => v1::Value { - value: Some(v1::value::Value::F64Value(*v)), - }, - Value::String(v) => v1::Value { - value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())), - }, - Value::Binary(v) => v1::Value { - value: Some(v1::value::Value::BinaryValue(v.to_vec())), - }, - Value::Date(v) => v1::Value { - value: Some(v1::value::Value::DateValue(v.val())), - }, - Value::DateTime(v) => v1::Value { - value: Some(v1::value::Value::DatetimeValue(v.val())), - }, - Value::Timestamp(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value: Some(v1::value::Value::TsSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value: Some(v1::value::Value::TsMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value: Some(v1::value::Value::TsMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value: Some(v1::value::Value::TsNanosecondValue(v.value())), - }, - }, - Value::Time(v) => match v.unit() { - TimeUnit::Second => v1::Value { - value: Some(v1::value::Value::TimeSecondValue(v.value())), - }, - TimeUnit::Millisecond => v1::Value { - value: Some(v1::value::Value::TimeMillisecondValue(v.value())), - }, - TimeUnit::Microsecond => v1::Value { - value: Some(v1::value::Value::TimeMicrosecondValue(v.value())), - }, - TimeUnit::Nanosecond => v1::Value { - value: Some(v1::value::Value::TimeNanosecondValue(v.value())), - }, - }, - Value::Interval(_) | Value::List(_) => return None, - }; - - Some(proto_value) -} - -/// Returns the [ColumnDataType] of the value. -/// -/// If value is null, returns `None`. -pub(crate) fn proto_value_type(value: &v1::Value) -> Option { - let value_data = value.value.as_ref()?; - let value_type = match value_data { - v1::value::Value::I8Value(_) => ColumnDataType::Int8, - v1::value::Value::I16Value(_) => ColumnDataType::Int16, - v1::value::Value::I32Value(_) => ColumnDataType::Int32, - v1::value::Value::I64Value(_) => ColumnDataType::Int64, - v1::value::Value::U8Value(_) => ColumnDataType::Uint8, - v1::value::Value::U16Value(_) => ColumnDataType::Uint16, - v1::value::Value::U32Value(_) => ColumnDataType::Uint32, - v1::value::Value::U64Value(_) => ColumnDataType::Uint64, - v1::value::Value::F32Value(_) => ColumnDataType::Float32, - v1::value::Value::F64Value(_) => ColumnDataType::Float64, - v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, - v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, - v1::value::Value::StringValue(_) => ColumnDataType::String, - v1::value::Value::DateValue(_) => ColumnDataType::Date, - v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, - v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, - v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, - v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, - v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, - v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, - v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, - v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, - v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, - }; - Some(value_type) -} - -// TODO(yingwen): Support conversion in greptime-proto. -/// Creates value for i64. -#[cfg(test)] -pub(crate) fn i64_value(data: i64) -> v1::Value { - v1::Value { - value: Some(v1::value::Value::I64Value(data)), - } -} - -/// Creates value for timestamp millis. -#[cfg(test)] -pub(crate) fn ts_ms_value(data: i64) -> v1::Value { - v1::Value { - value: Some(v1::value::Value::TsMillisecondValue(data)), - } -} - -/// Convert [ConcreteDataType] to [ColumnDataType]. -pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { - let column_data_type = match data_type { - ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, - ConcreteDataType::Int8(_) => ColumnDataType::Int8, - ConcreteDataType::Int16(_) => ColumnDataType::Int16, - ConcreteDataType::Int32(_) => ColumnDataType::Int32, - ConcreteDataType::Int64(_) => ColumnDataType::Int64, - ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, - ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, - ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, - ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, - ConcreteDataType::Float32(_) => ColumnDataType::Float32, - ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) => ColumnDataType::Binary, - ConcreteDataType::String(_) => ColumnDataType::String, - ConcreteDataType::Date(_) => ColumnDataType::Date, - ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, - ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { - ColumnDataType::TimestampMillisecond - } - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { - ColumnDataType::TimestampMicrosecond - } - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { - ColumnDataType::TimestampNanosecond - } - ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, - ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, - ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, - ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, - ConcreteDataType::Null(_) - | ConcreteDataType::Interval(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => return None, - }; - - Some(column_data_type) -} - -/// Convert semantic type to proto's semantic type -pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType { - match semantic_type { - SemanticType::Tag => v1::SemanticType::Tag, - SemanticType::Field => v1::SemanticType::Field, - SemanticType::Timestamp => v1::SemanticType::Timestamp, - } -} - -/// Convert op type to proto's op type. -pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType { - match op_type { - OpType::Delete => v1::mito::OpType::Delete, - OpType::Put => v1::mito::OpType::Put, - } -} - -/// Returns true if the column type is equal to expected type. -fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { - if let Some(expect) = to_column_data_type(expect_type) { - column_type == expect - } else { - false - } -} - -// TODO(yingwen): Tests. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4e5b3fa7fb..ec1ce0ac89 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -17,19 +17,19 @@ use std::collections::HashMap; use std::time::Duration; +use api::helper::{ + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, + to_proto_value, +}; +use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, Value}; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; +use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; use crate::metadata::{ColumnMetadata, RegionMetadata}; -use crate::proto_util::{ - is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, - to_proto_semantic_type, to_proto_value, -}; /// Options that affect the entire region. /// @@ -139,7 +139,7 @@ impl WriteRequest { for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() { validate_proto_value(region_id, value, column_schema)?; - if value.value.is_none() { + if value.value_data.is_none() { has_null[i] = true; } } @@ -202,7 +202,7 @@ impl WriteRequest { "column {} has semantic type {:?}, given: {}({})", column.column_schema.name, column.semantic_type, - greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type) + api::v1::SemanticType::from_i32(input_col.semantic_type) .map(|v| v.as_str_name()) .unwrap_or("Unknown"), input_col.semantic_type @@ -282,6 +282,7 @@ impl WriteRequest { })?; // Convert default value into proto's value. + let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu { region_id, reason: format!( @@ -308,7 +309,7 @@ impl WriteRequest { self.rows.schema.push(ColumnSchema { column_name: column.column_schema.name.clone(), datatype: datatype as i32, - semantic_type: to_proto_semantic_type(column.semantic_type) as i32, + semantic_type: column.semantic_type as i32, }); Ok(()) @@ -426,13 +427,13 @@ impl RequestBody { #[cfg(test)] mod tests { + use api::v1::{Row, SemanticType}; use datatypes::prelude::ConcreteDataType; - use greptime_proto::v1::{Row, SemanticType}; use super::*; use crate::error::Error; use crate::metadata::RegionMetadataBuilder; - use crate::proto_util::{i64_value, ts_ms_value}; + use crate::test_util::{i64_value, ts_ms_value}; fn new_column_schema( name: &str, @@ -516,7 +517,7 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), false, ), - semantic_type: crate::metadata::SemanticType::Timestamp, + semantic_type: SemanticType::Timestamp, column_id: 1, }) .push_column_metadata(ColumnMetadata { @@ -525,7 +526,7 @@ mod tests { ConcreteDataType::int64_datatype(), true, ), - semantic_type: crate::metadata::SemanticType::Tag, + semantic_type: SemanticType::Tag, column_id: 2, }) .primary_key(vec![2]); @@ -605,7 +606,7 @@ mod tests { new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), ], rows: vec![Row { - values: vec![Value { value: None }, i64_value(2)], + values: vec![Value { value_data: None }, i64_value(2)], }], }; let metadata = new_region_metadata(); @@ -686,7 +687,7 @@ mod tests { new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), ], rows: vec![Row { - values: vec![ts_ms_value(1), Value { value: None }], + values: vec![ts_ms_value(1), Value { value_data: None }], }], }; assert_eq!(expect_rows, request.rows); diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 9dc1d5b6fe..68265b28a0 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -16,6 +16,9 @@ use std::sync::Arc; +use api::greptime_proto::v1; +use api::v1::value::ValueData; +use api::v1::SemanticType; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::prelude::ConcreteDataType; @@ -30,7 +33,7 @@ use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType}; +use crate::metadata::{ColumnMetadata, RegionMetadataRef}; use crate::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; @@ -226,3 +229,20 @@ impl CreateRequestBuilder { } } } + +// TODO(yingwen): Support conversion in greptime-proto. +/// Creates value for i64. +#[cfg(test)] +pub(crate) fn i64_value(data: i64) -> v1::Value { + v1::Value { + value_data: Some(ValueData::I64Value(data)), + } +} + +/// Creates value for timestamp millis. +#[cfg(test)] +pub(crate) fn ts_ms_value(data: i64) -> v1::Value { + v1::Value { + value_data: Some(ValueData::TsMillisecondValue(data)), + } +} diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index ff0ece386f..abad8055dd 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -17,11 +17,11 @@ use std::mem; use std::sync::Arc; +use api::v1::WalEntry; use async_stream::try_stream; use common_error::ext::BoxedError; use futures::stream::BoxStream; use futures::StreamExt; -use greptime_proto::v1::mito::WalEntry; use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; @@ -157,10 +157,11 @@ impl WalWriter { #[cfg(test)] mod tests { + use api::v1::{ + value, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType, Value, + }; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use futures::TryStreamExt; - use greptime_proto::v1::mito::{Mutation, OpType}; - use greptime_proto::v1::{value, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use store_api::storage::SequenceNumber; @@ -199,10 +200,10 @@ mod tests { .map(|(str_col, int_col)| { let values = vec![ Value { - value: Some(value::Value::StringValue(str_col.to_string())), + value_data: Some(value::ValueData::StringValue(str_col.to_string())), }, Value { - value: Some(value::Value::TsMillisecondValue(*int_col)), + value_data: Some(value::ValueData::TsMillisecondValue(*int_col)), }, ]; Row { values } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 8ddad90c11..f31bc6f3d2 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -18,7 +18,7 @@ use std::collections::{hash_map, HashMap}; use std::mem; use std::sync::Arc; -use greptime_proto::v1::mito::{Mutation, WalEntry}; +use api::v1::{Mutation, WalEntry}; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; @@ -27,7 +27,6 @@ use tokio::sync::oneshot::Sender; use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu}; use crate::memtable::KeyValues; use crate::metadata::RegionMetadata; -use crate::proto_util::to_proto_op_type; use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; use crate::request::{SenderWriteRequest, WriteRequest}; @@ -212,7 +211,7 @@ impl RegionWriteCtx { let num_rows = sender_req.request.rows.rows.len() as u64; self.wal_entry.mutations.push(Mutation { - op_type: to_proto_op_type(sender_req.request.op_type) as i32, + op_type: sender_req.request.op_type as i32, sequence: self.next_sequence, rows: Some(sender_req.request.rows), }); diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index c5ddc0ec76..0a8cb4101a 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -194,6 +194,8 @@ impl GrpcQueryHandler for DummyInstance { } } Request::Ddl(_) => unimplemented!(), + Request::RowInserts(_) => unimplemented!(), + Request::RowDelete(_) => unimplemented!(), }; Ok(output) } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 8075629aa7..ce6eb012b4 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +api.workspace = true arc-swap = "1.0" arrow-array.workspace = true arrow.workspace = true diff --git a/src/storage/benches/memtable/mod.rs b/src/storage/benches/memtable/mod.rs index a9a956649d..c0b6cd0ce2 100644 --- a/src/storage/benches/memtable/mod.rs +++ b/src/storage/benches/memtable/mod.rs @@ -20,6 +20,7 @@ pub mod util; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use api::v1::OpType; use datatypes::prelude::ScalarVectorBuilder; use datatypes::timestamp::TimestampMillisecond; use datatypes::vectors::{ @@ -29,7 +30,7 @@ use rand::distributions::Alphanumeric; use rand::prelude::ThreadRng; use rand::Rng; use storage::memtable::KeyValues; -use store_api::storage::{OpType, SequenceNumber}; +use store_api::storage::SequenceNumber; static NEXT_SEQUENCE: AtomicU64 = AtomicU64::new(0); diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 4855d15c20..734d229b4a 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -114,6 +114,7 @@ mod tests { use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; + use api::v1::OpType; use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; use common_time::Timestamp; @@ -124,7 +125,7 @@ mod tests { }; use object_store::services::Fs; use object_store::ObjectStore; - use store_api::storage::{ChunkReader, OpType, SequenceNumber}; + use store_api::storage::{ChunkReader, SequenceNumber}; use super::*; use crate::file_purger::noop::new_noop_file_purger; diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index 9309dd0da1..da7899e600 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -109,10 +109,10 @@ pub mod noop { #[cfg(test)] mod tests { + use api::v1::OpType; use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; use object_store::ObjectStore; - use store_api::storage::OpType; use super::*; use crate::file_purger::noop::NoopFilePurgeHandler; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index b76af906ad..620cbc7af3 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -22,11 +22,12 @@ use std::fmt; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use api::v1::OpType; use common_time::range::TimestampRange; use common_time::Timestamp; use datatypes::vectors::VectorRef; use metrics::{decrement_gauge, increment_gauge}; -use store_api::storage::{consts, OpType, SequenceNumber}; +use store_api::storage::{consts, SequenceNumber}; use crate::error::Result; use crate::flush::FlushStrategyRef; diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 9d1aaa1a8a..0874e96e70 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -19,12 +19,13 @@ use std::ops::Bound; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, RwLock}; +use api::v1::OpType; use common_time::range::TimestampRange; use datatypes::data_type::DataType; use datatypes::prelude::*; use datatypes::value::Value; use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder}; -use store_api::storage::{OpType, SequenceNumber}; +use store_api::storage::{SequenceNumber, MIN_OP_TYPE}; use crate::error::Result; use crate::flush::FlushStrategyRef; @@ -300,7 +301,7 @@ fn collect_iter<'a, I: Iterator>( for (inner_key, row_value) in iter.take(batch_size) { keys.push(inner_key); sequences.push(Some(inner_key.sequence)); - op_types.push(Some(inner_key.op_type.as_u8())); + op_types.push(Some(inner_key.op_type as u8)); values.push(row_value); } @@ -494,7 +495,7 @@ impl InnerKey { // to zero (Minimum value). self.sequence = 0; self.index_in_batch = 0; - self.op_type = OpType::min_type(); + self.op_type = MIN_OP_TYPE; } } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 8dfdd0bad2..3e18a4063a 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::storage::{OpType, SequenceNumber}; +use api::v1::OpType; +use store_api::storage::SequenceNumber; use super::MemtableRef; use crate::error::Result; diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index df7a639aac..36bd466134 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -139,7 +139,7 @@ fn check_iter_content( assert_eq!(Value::from(values[index].0), v0); assert_eq!(Value::from(values[index].1), v1); assert_eq!(Value::from(sequences[index]), sequence); - assert_eq!(Value::from(op_types[index].as_u8()), op_type); + assert_eq!(Value::from(op_types[index] as u8), op_type); index += 1; } diff --git a/src/storage/src/proto/wal.rs b/src/storage/src/proto/wal.rs index 56a0cd4f7e..c07fd5e9f0 100644 --- a/src/storage/src/proto/wal.rs +++ b/src/storage/src/proto/wal.rs @@ -15,7 +15,7 @@ #![allow(clippy::all)] tonic::include_proto!("greptime.storage.wal.v1"); -use store_api::storage::OpType; +use api::v1::OpType; use crate::write_batch::Payload; diff --git a/src/storage/src/read/dedup.rs b/src/storage/src/read/dedup.rs index e26da65609..d08415aac2 100644 --- a/src/storage/src/read/dedup.rs +++ b/src/storage/src/read/dedup.rs @@ -92,7 +92,7 @@ impl BatchReader for DedupReader { #[cfg(test)] mod tests { - use store_api::storage::OpType; + use api::v1::OpType; use super::*; use crate::test_util::read_util; diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 6a20bd2fbc..756c9a8777 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -16,12 +16,13 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; +use api::v1::OpType; use common_base::BitVec; use datatypes::prelude::ScalarVector; use datatypes::schema::{SchemaBuilder, SchemaRef}; use datatypes::vectors::{BooleanVector, UInt8Vector}; use snafu::{ensure, ResultExt}; -use store_api::storage::{Chunk, ColumnId, OpType}; +use store_api::storage::{Chunk, ColumnId}; use crate::error; use crate::metadata::{self, Result}; @@ -351,7 +352,7 @@ impl BatchOp for ProjectedSchema { }); for (i, op_type) in op_types.iter_data().enumerate() { - if op_type == Some(OpType::Delete.as_u8()) { + if op_type == Some(OpType::Delete as u8) { selected.set(i, false); } } @@ -360,10 +361,10 @@ impl BatchOp for ProjectedSchema { #[cfg(test)] mod tests { + use api::v1::OpType; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; - use store_api::storage::OpType; use super::*; use crate::metadata::Error; diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index a6c76b2ab6..fefdaaca9b 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -355,6 +355,7 @@ mod tests { use std::ops::Range; use std::sync::Arc; + use api::v1::OpType; use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; use datatypes::arrow::array::{Array, UInt64Array, UInt8Array}; @@ -362,7 +363,6 @@ mod tests { use datatypes::types::{TimestampMillisecondType, TimestampType}; use datatypes::vectors::TimestampMillisecondVector; use object_store::services::Fs; - use store_api::storage::OpType; use super::*; use crate::file_purger::noop::new_noop_file_purger; diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs index 2d5476243c..23cfc6c302 100644 --- a/src/storage/src/test_util/read_util.rs +++ b/src/storage/src/test_util/read_util.rs @@ -14,11 +14,11 @@ use std::sync::Arc; +use api::v1::OpType; use async_trait::async_trait; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; -use store_api::storage::OpType; use crate::error::Result; use crate::memtable::{BatchIterator, BoxedBatchIterator, RowOrdering}; @@ -64,7 +64,7 @@ pub fn new_full_kv_batch(all_values: &[(i64, i64, u64, OpType)]) -> Batch { let value = Arc::new(Int64Vector::from_values(all_values.iter().map(|v| v.1))); let sequences = Arc::new(UInt64Vector::from_values(all_values.iter().map(|v| v.2))); let op_types = Arc::new(UInt8Vector::from_values( - all_values.iter().map(|v| v.3.as_u8()), + all_values.iter().map(|v| v.3 as u8), )); Batch::new(vec![key, value, sequences, op_types]) diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 83aaeee3e7..4d58c06956 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -17,11 +17,12 @@ mod compat; use std::collections::HashMap; +use api::v1::OpType; use common_recordbatch::RecordBatch; use datatypes::schema::{ColumnSchema, SchemaRef}; use datatypes::vectors::VectorRef; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{OpType, WriteRequest}; +use store_api::storage::WriteRequest; use crate::error::{ BatchMissingColumnSnafu, CreateDefaultSnafu, CreateRecordBatchSnafu, Error, HasNullSnafu, diff --git a/src/storage/src/write_batch/codec.rs b/src/storage/src/write_batch/codec.rs index 539167fcd0..b0ce435dd7 100644 --- a/src/storage/src/write_batch/codec.rs +++ b/src/storage/src/write_batch/codec.rs @@ -15,12 +15,12 @@ use std::io::Cursor; use std::sync::Arc; +use api::v1::OpType; use common_recordbatch::RecordBatch; use datatypes::arrow::ipc::reader::StreamReader; use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; use datatypes::schema::Schema; use snafu::{ensure, ResultExt}; -use store_api::storage::OpType; use crate::codec::{Decoder, Encoder}; use crate::error::{ diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index e1c20c8cfe..b0615d727e 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +api.workspace = true async-trait.workspace = true bytes = "1.1" common-base = { workspace = true } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index dff799a51f..da59e9aeaa 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -45,4 +45,4 @@ pub use self::requests::{ }; pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; pub use self::snapshot::{ReadContext, Snapshot}; -pub use self::types::{OpType, SequenceNumber}; +pub use self::types::{SequenceNumber, MIN_OP_TYPE}; diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index f8336edc42..7ce8dcc55b 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -14,43 +14,11 @@ //! Common types. +use api::v1::OpType; + /// Represents a sequence number of data in storage. The offset of logstore can be used /// as a sequence number. pub type SequenceNumber = u64; -/// Operation type of the value to write to storage. -/// -/// The enum values are stored in the SST files so don't change -/// them if possible. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub enum OpType { - /// Delete operation. - Delete = 0, - /// Put operation. - Put = 1, -} - -impl OpType { - /// Cast the [OpType] to u8. - #[inline] - pub fn as_u8(&self) -> u8 { - *self as u8 - } - - /// Minimal op type after casting to u8. - pub const fn min_type() -> OpType { - OpType::Delete - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_op_type() { - assert_eq!(0, OpType::Delete.as_u8()); - assert_eq!(1, OpType::Put.as_u8()); - assert_eq!(0, OpType::min_type().as_u8()); - } -} +// TODO(hl): We should implement a `min` method for OpType in greptime-proto crate. +pub const MIN_OP_TYPE: OpType = OpType::Delete;